mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #4613 from heyitsanthony/clientv3-composite
clientv3: compose API interfaces into client struct
This commit is contained in:
commit
4fb25d5f0e
@ -24,7 +24,6 @@ import (
|
|||||||
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
|
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/credentials"
|
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/credentials"
|
||||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
|
||||||
"github.com/coreos/etcd/pkg/transport"
|
"github.com/coreos/etcd/pkg/transport"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -34,14 +33,10 @@ var (
|
|||||||
|
|
||||||
// Client provides and manages an etcd v3 client session.
|
// Client provides and manages an etcd v3 client session.
|
||||||
type Client struct {
|
type Client struct {
|
||||||
// KV is the keyvalue API for the client's connection.
|
Cluster
|
||||||
KV pb.KVClient
|
KV
|
||||||
// Lease is the lease API for the client's connection.
|
Lease
|
||||||
Lease pb.LeaseClient
|
Watcher
|
||||||
// Watch is the watch API for the client's connection.
|
|
||||||
Watch pb.WatchClient
|
|
||||||
// Cluster is the cluster API for the client's connection.
|
|
||||||
Cluster pb.ClusterClient
|
|
||||||
|
|
||||||
conn *grpc.ClientConn
|
conn *grpc.ClientConn
|
||||||
cfg Config
|
cfg Config
|
||||||
@ -86,6 +81,8 @@ func NewFromURL(url string) (*Client, error) {
|
|||||||
|
|
||||||
// Close shuts down the client's etcd connections.
|
// Close shuts down the client's etcd connections.
|
||||||
func (c *Client) Close() error {
|
func (c *Client) Close() error {
|
||||||
|
c.Watcher.Close()
|
||||||
|
c.Lease.Close()
|
||||||
return c.conn.Close()
|
return c.conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -146,15 +143,17 @@ func newClient(cfg *Config) (*Client, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &Client{
|
client := &Client{
|
||||||
KV: pb.NewKVClient(conn),
|
conn: conn,
|
||||||
Lease: pb.NewLeaseClient(conn),
|
cfg: *cfg,
|
||||||
Watch: pb.NewWatchClient(conn),
|
creds: creds,
|
||||||
Cluster: pb.NewClusterClient(conn),
|
}
|
||||||
conn: conn,
|
client.Cluster = NewCluster(client)
|
||||||
cfg: *cfg,
|
client.KV = NewKV(client)
|
||||||
creds: creds,
|
client.Lease = NewLease(client)
|
||||||
}, nil
|
client.Watcher = NewWatcher(client)
|
||||||
|
|
||||||
|
return client, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ActiveConnection returns the current in-use connection
|
// ActiveConnection returns the current in-use connection
|
||||||
|
@ -40,9 +40,7 @@ func NewUniqueKey(ctx context.Context, kv v3.KV, pfx string, opts ...v3.OpOption
|
|||||||
}
|
}
|
||||||
|
|
||||||
func waitUpdate(ctx context.Context, client *v3.Client, key string, opts ...v3.OpOption) error {
|
func waitUpdate(ctx context.Context, client *v3.Client, key string, opts ...v3.OpOption) error {
|
||||||
w := v3.NewWatcher(client)
|
wc := client.Watch(ctx, key, opts...)
|
||||||
defer w.Close()
|
|
||||||
wc := w.Watch(ctx, key, opts...)
|
|
||||||
if wc == nil {
|
if wc == nil {
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
}
|
}
|
||||||
|
@ -24,7 +24,6 @@ import (
|
|||||||
// Mutex implements the sync Locker interface with etcd
|
// Mutex implements the sync Locker interface with etcd
|
||||||
type Mutex struct {
|
type Mutex struct {
|
||||||
client *v3.Client
|
client *v3.Client
|
||||||
kv v3.KV
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
|
||||||
pfx string
|
pfx string
|
||||||
@ -33,7 +32,7 @@ type Mutex struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewMutex(ctx context.Context, client *v3.Client, pfx string) *Mutex {
|
func NewMutex(ctx context.Context, client *v3.Client, pfx string) *Mutex {
|
||||||
return &Mutex{client, v3.NewKV(client), ctx, pfx, "", -1}
|
return &Mutex{client, ctx, pfx, "", -1}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lock locks the mutex with a cancellable context. If the context is cancelled
|
// Lock locks the mutex with a cancellable context. If the context is cancelled
|
||||||
@ -44,12 +43,12 @@ func (m *Mutex) Lock(ctx context.Context) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// put self in lock waiters via myKey; oldest waiter holds lock
|
// put self in lock waiters via myKey; oldest waiter holds lock
|
||||||
m.myKey, m.myRev, err = NewUniqueKey(ctx, m.kv, m.pfx, v3.WithLease(s.Lease()))
|
m.myKey, m.myRev, err = NewUniqueKey(ctx, m.client, m.pfx, v3.WithLease(s.Lease()))
|
||||||
// wait for lock to become available
|
// wait for lock to become available
|
||||||
for err == nil {
|
for err == nil {
|
||||||
// find oldest element in waiters via revision of insertion
|
// find oldest element in waiters via revision of insertion
|
||||||
var resp *v3.GetResponse
|
var resp *v3.GetResponse
|
||||||
resp, err = m.kv.Get(ctx, m.pfx, v3.WithFirstRev()...)
|
resp, err = m.client.Get(ctx, m.pfx, v3.WithFirstRev()...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -59,7 +58,7 @@ func (m *Mutex) Lock(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
// otherwise myKey isn't lowest, so there must be a pfx prior to myKey
|
// otherwise myKey isn't lowest, so there must be a pfx prior to myKey
|
||||||
opts := append(v3.WithLastRev(), v3.WithRev(m.myRev-1))
|
opts := append(v3.WithLastRev(), v3.WithRev(m.myRev-1))
|
||||||
resp, err = m.kv.Get(ctx, m.pfx, opts...)
|
resp, err = m.client.Get(ctx, m.pfx, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -80,7 +79,7 @@ func (m *Mutex) Lock(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Mutex) Unlock() error {
|
func (m *Mutex) Unlock() error {
|
||||||
if _, err := m.kv.Delete(m.ctx, m.myKey); err != nil {
|
if _, err := m.client.Delete(m.ctx, m.myKey); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
m.myKey = "\x00"
|
m.myKey = "\x00"
|
||||||
|
@ -49,15 +49,14 @@ func NewSession(client *v3.Client) (*Session, error) {
|
|||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
lc := v3.NewLease(client)
|
resp, err := client.Create(context.TODO(), sessionTTL)
|
||||||
resp, err := lc.Create(context.TODO(), sessionTTL)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
id := lease.LeaseID(resp.ID)
|
id := lease.LeaseID(resp.ID)
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
keepAlive, err := lc.KeepAlive(ctx, id)
|
keepAlive, err := client.KeepAlive(ctx, id)
|
||||||
if err != nil || keepAlive == nil {
|
if err != nil || keepAlive == nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -72,7 +71,6 @@ func NewSession(client *v3.Client) (*Session, error) {
|
|||||||
clientSessions.mu.Lock()
|
clientSessions.mu.Lock()
|
||||||
delete(clientSessions.sessions, client)
|
delete(clientSessions.sessions, client)
|
||||||
clientSessions.mu.Unlock()
|
clientSessions.mu.Unlock()
|
||||||
lc.Close()
|
|
||||||
close(donec)
|
close(donec)
|
||||||
}()
|
}()
|
||||||
for range keepAlive {
|
for range keepAlive {
|
||||||
@ -101,6 +99,6 @@ func (s *Session) Orphan() {
|
|||||||
// Close orphans the session and revokes the session lease.
|
// Close orphans the session and revokes the session lease.
|
||||||
func (s *Session) Close() error {
|
func (s *Session) Close() error {
|
||||||
s.Orphan()
|
s.Orphan()
|
||||||
_, err := v3.NewLease(s.client).Revoke(context.TODO(), s.id)
|
_, err := s.client.Revoke(context.TODO(), s.id)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -32,9 +32,7 @@ func ExampleCluster_memberList() {
|
|||||||
}
|
}
|
||||||
defer cli.Close()
|
defer cli.Close()
|
||||||
|
|
||||||
capi := clientv3.NewCluster(cli)
|
resp, err := cli.MemberList(context.Background())
|
||||||
|
|
||||||
resp, err := capi.MemberList(context.Background())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -52,9 +50,7 @@ func ExampleCluster_memberLeader() {
|
|||||||
}
|
}
|
||||||
defer cli.Close()
|
defer cli.Close()
|
||||||
|
|
||||||
capi := clientv3.NewCluster(cli)
|
resp, err := cli.MemberLeader(context.Background())
|
||||||
|
|
||||||
resp, err := capi.MemberLeader(context.Background())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -72,10 +68,8 @@ func ExampleCluster_memberAdd() {
|
|||||||
}
|
}
|
||||||
defer cli.Close()
|
defer cli.Close()
|
||||||
|
|
||||||
capi := clientv3.NewCluster(cli)
|
|
||||||
|
|
||||||
peerURLs := endpoints[2:]
|
peerURLs := endpoints[2:]
|
||||||
mresp, err := capi.MemberAdd(context.Background(), peerURLs)
|
mresp, err := cli.MemberAdd(context.Background(), peerURLs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -93,14 +87,12 @@ func ExampleCluster_memberRemove() {
|
|||||||
}
|
}
|
||||||
defer cli.Close()
|
defer cli.Close()
|
||||||
|
|
||||||
capi := clientv3.NewCluster(cli)
|
resp, err := cli.MemberList(context.Background())
|
||||||
|
|
||||||
resp, err := capi.MemberList(context.Background())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = capi.MemberRemove(context.Background(), resp.Members[0].ID)
|
_, err = cli.MemberRemove(context.Background(), resp.Members[0].ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -116,15 +108,13 @@ func ExampleCluster_memberUpdate() {
|
|||||||
}
|
}
|
||||||
defer cli.Close()
|
defer cli.Close()
|
||||||
|
|
||||||
capi := clientv3.NewCluster(cli)
|
resp, err := cli.MemberList(context.Background())
|
||||||
|
|
||||||
resp, err := capi.MemberList(context.Background())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
peerURLs := []string{"http://localhost:12378"}
|
peerURLs := []string{"http://localhost:12378"}
|
||||||
_, err = capi.MemberUpdate(context.Background(), resp.Members[0].ID, peerURLs)
|
_, err = cli.MemberUpdate(context.Background(), resp.Members[0].ID, peerURLs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -32,10 +32,8 @@ func ExampleKV_put() {
|
|||||||
}
|
}
|
||||||
defer cli.Close()
|
defer cli.Close()
|
||||||
|
|
||||||
kvc := clientv3.NewKV(cli)
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
resp, err := kvc.Put(ctx, "sample_key", "sample_value")
|
resp, err := cli.Put(ctx, "sample_key", "sample_value")
|
||||||
cancel()
|
cancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
@ -54,15 +52,13 @@ func ExampleKV_get() {
|
|||||||
}
|
}
|
||||||
defer cli.Close()
|
defer cli.Close()
|
||||||
|
|
||||||
kvc := clientv3.NewKV(cli)
|
_, err = cli.Put(context.TODO(), "foo", "bar")
|
||||||
|
|
||||||
_, err = kvc.Put(context.TODO(), "foo", "bar")
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
resp, err := kvc.Get(ctx, "foo")
|
resp, err := cli.Get(ctx, "foo")
|
||||||
cancel()
|
cancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
@ -83,11 +79,9 @@ func ExampleKV_getSortedPrefix() {
|
|||||||
}
|
}
|
||||||
defer cli.Close()
|
defer cli.Close()
|
||||||
|
|
||||||
kvc := clientv3.NewKV(cli)
|
|
||||||
|
|
||||||
for i := range make([]int, 3) {
|
for i := range make([]int, 3) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
_, err = kvc.Put(ctx, fmt.Sprintf("key_%d", i), "value")
|
_, err = cli.Put(ctx, fmt.Sprintf("key_%d", i), "value")
|
||||||
cancel()
|
cancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
@ -95,7 +89,7 @@ func ExampleKV_getSortedPrefix() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
resp, err := kvc.Get(ctx, "key", clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
|
resp, err := cli.Get(ctx, "key", clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
|
||||||
cancel()
|
cancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
@ -118,10 +112,8 @@ func ExampleKV_delete() {
|
|||||||
}
|
}
|
||||||
defer cli.Close()
|
defer cli.Close()
|
||||||
|
|
||||||
kvc := clientv3.NewKV(cli)
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
resp, err := kvc.Delete(ctx, "key", clientv3.WithPrefix())
|
resp, err := cli.Delete(ctx, "key", clientv3.WithPrefix())
|
||||||
cancel()
|
cancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
@ -140,10 +132,8 @@ func ExampleKV_compact() {
|
|||||||
}
|
}
|
||||||
defer cli.Close()
|
defer cli.Close()
|
||||||
|
|
||||||
kvc := clientv3.NewKV(cli)
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
resp, err := kvc.Get(ctx, "foo")
|
resp, err := cli.Get(ctx, "foo")
|
||||||
cancel()
|
cancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
@ -151,7 +141,7 @@ func ExampleKV_compact() {
|
|||||||
compRev := resp.Header.Revision // specify compact revision of your choice
|
compRev := resp.Header.Revision // specify compact revision of your choice
|
||||||
|
|
||||||
ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
|
||||||
err = kvc.Compact(ctx, compRev)
|
err = cli.Compact(ctx, compRev)
|
||||||
cancel()
|
cancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
@ -207,15 +197,13 @@ func ExampleKV_do() {
|
|||||||
}
|
}
|
||||||
defer cli.Close()
|
defer cli.Close()
|
||||||
|
|
||||||
kvc := clientv3.NewKV(cli)
|
|
||||||
|
|
||||||
ops := []clientv3.Op{
|
ops := []clientv3.Op{
|
||||||
clientv3.OpPut("put-key", "123"),
|
clientv3.OpPut("put-key", "123"),
|
||||||
clientv3.OpGet("put-key"),
|
clientv3.OpGet("put-key"),
|
||||||
clientv3.OpPut("put-key", "456")}
|
clientv3.OpPut("put-key", "456")}
|
||||||
|
|
||||||
for _, op := range ops {
|
for _, op := range ops {
|
||||||
if _, err := kvc.Do(context.TODO(), op); err != nil {
|
if _, err := cli.Do(context.TODO(), op); err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -33,18 +33,14 @@ func ExampleLease_create() {
|
|||||||
}
|
}
|
||||||
defer cli.Close()
|
defer cli.Close()
|
||||||
|
|
||||||
kvc := clientv3.NewKV(cli)
|
|
||||||
lapi := clientv3.NewLease(cli)
|
|
||||||
defer lapi.Close()
|
|
||||||
|
|
||||||
// minimum lease TTL is 5-second
|
// minimum lease TTL is 5-second
|
||||||
resp, err := lapi.Create(context.TODO(), 5)
|
resp, err := cli.Create(context.TODO(), 5)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// after 5 seconds, the key 'foo' will be removed
|
// after 5 seconds, the key 'foo' will be removed
|
||||||
_, err = kvc.Put(context.TODO(), "foo", "bar", clientv3.WithLease(lease.LeaseID(resp.ID)))
|
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(lease.LeaseID(resp.ID)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -60,27 +56,23 @@ func ExampleLease_revoke() {
|
|||||||
}
|
}
|
||||||
defer cli.Close()
|
defer cli.Close()
|
||||||
|
|
||||||
kvc := clientv3.NewKV(cli)
|
resp, err := cli.Create(context.TODO(), 5)
|
||||||
lapi := clientv3.NewLease(cli)
|
|
||||||
defer lapi.Close()
|
|
||||||
|
|
||||||
resp, err := lapi.Create(context.TODO(), 5)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = kvc.Put(context.TODO(), "foo", "bar", clientv3.WithLease(lease.LeaseID(resp.ID)))
|
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(lease.LeaseID(resp.ID)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// revoking lease expires the key attached to its lease ID
|
// revoking lease expires the key attached to its lease ID
|
||||||
_, err = lapi.Revoke(context.TODO(), lease.LeaseID(resp.ID))
|
_, err = cli.Revoke(context.TODO(), lease.LeaseID(resp.ID))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
gresp, err := kvc.Get(context.TODO(), "foo")
|
gresp, err := cli.Get(context.TODO(), "foo")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -98,22 +90,18 @@ func ExampleLease_keepAlive() {
|
|||||||
}
|
}
|
||||||
defer cli.Close()
|
defer cli.Close()
|
||||||
|
|
||||||
kvc := clientv3.NewKV(cli)
|
resp, err := cli.Create(context.TODO(), 5)
|
||||||
lapi := clientv3.NewLease(cli)
|
|
||||||
defer lapi.Close()
|
|
||||||
|
|
||||||
resp, err := lapi.Create(context.TODO(), 5)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = kvc.Put(context.TODO(), "foo", "bar", clientv3.WithLease(lease.LeaseID(resp.ID)))
|
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(lease.LeaseID(resp.ID)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// the key 'foo' will be kept forever
|
// the key 'foo' will be kept forever
|
||||||
_, err = lapi.KeepAlive(context.TODO(), lease.LeaseID(resp.ID))
|
_, err = cli.KeepAlive(context.TODO(), lease.LeaseID(resp.ID))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -129,22 +117,18 @@ func ExampleLease_keepAliveOnce() {
|
|||||||
}
|
}
|
||||||
defer cli.Close()
|
defer cli.Close()
|
||||||
|
|
||||||
kvc := clientv3.NewKV(cli)
|
resp, err := cli.Create(context.TODO(), 5)
|
||||||
lapi := clientv3.NewLease(cli)
|
|
||||||
defer lapi.Close()
|
|
||||||
|
|
||||||
resp, err := lapi.Create(context.TODO(), 5)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = kvc.Put(context.TODO(), "foo", "bar", clientv3.WithLease(lease.LeaseID(resp.ID)))
|
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(lease.LeaseID(resp.ID)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// to renew the lease only once
|
// to renew the lease only once
|
||||||
_, err = lapi.KeepAliveOnce(context.TODO(), lease.LeaseID(resp.ID))
|
_, err = cli.KeepAliveOnce(context.TODO(), lease.LeaseID(resp.ID))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -38,9 +38,7 @@ func Example() {
|
|||||||
}
|
}
|
||||||
defer cli.Close() // make sure to close the client
|
defer cli.Close() // make sure to close the client
|
||||||
|
|
||||||
kvc := clientv3.NewKV(cli)
|
_, err = cli.Put(context.TODO(), "foo", "bar")
|
||||||
|
|
||||||
_, err = kvc.Put(context.TODO(), "foo", "bar")
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -32,10 +32,7 @@ func ExampleWatcher_watch() {
|
|||||||
}
|
}
|
||||||
defer cli.Close()
|
defer cli.Close()
|
||||||
|
|
||||||
wc := clientv3.NewWatcher(cli)
|
rch := cli.Watch(context.Background(), "foo")
|
||||||
defer wc.Close()
|
|
||||||
|
|
||||||
rch := wc.Watch(context.Background(), "foo")
|
|
||||||
for wresp := range rch {
|
for wresp := range rch {
|
||||||
for _, ev := range wresp.Events {
|
for _, ev := range wresp.Events {
|
||||||
fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
|
fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
|
||||||
@ -54,10 +51,7 @@ func ExampleWatcher_watchPrefix() {
|
|||||||
}
|
}
|
||||||
defer cli.Close()
|
defer cli.Close()
|
||||||
|
|
||||||
wc := clientv3.NewWatcher(cli)
|
rch := cli.Watch(context.Background(), "foo", clientv3.WithPrefix())
|
||||||
defer wc.Close()
|
|
||||||
|
|
||||||
rch := wc.Watch(context.Background(), "foo", clientv3.WithPrefix())
|
|
||||||
for wresp := range rch {
|
for wresp := range rch {
|
||||||
for _, ev := range wresp.Events {
|
for _, ev := range wresp.Events {
|
||||||
fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
|
fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
|
||||||
|
@ -48,10 +48,9 @@ func (s *syncer) SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, cha
|
|||||||
respchan := make(chan clientv3.GetResponse, 1024)
|
respchan := make(chan clientv3.GetResponse, 1024)
|
||||||
errchan := make(chan error, 1)
|
errchan := make(chan error, 1)
|
||||||
|
|
||||||
kapi := clientv3.NewKV(s.c)
|
|
||||||
// if rev is not specified, we will choose the most recent revision.
|
// if rev is not specified, we will choose the most recent revision.
|
||||||
if s.rev == 0 {
|
if s.rev == 0 {
|
||||||
resp, err := kapi.Get(ctx, "foo")
|
resp, err := s.c.Get(ctx, "foo")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errchan <- err
|
errchan <- err
|
||||||
close(respchan)
|
close(respchan)
|
||||||
@ -83,7 +82,7 @@ func (s *syncer) SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, cha
|
|||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
resp, err := kapi.Get(ctx, key, opts...)
|
resp, err := s.c.Get(ctx, key, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errchan <- err
|
errchan <- err
|
||||||
return
|
return
|
||||||
@ -106,21 +105,5 @@ func (s *syncer) SyncUpdates(ctx context.Context) clientv3.WatchChan {
|
|||||||
if s.rev == 0 {
|
if s.rev == 0 {
|
||||||
panic("unexpected revision = 0. Calling SyncUpdates before SyncBase finishes?")
|
panic("unexpected revision = 0. Calling SyncUpdates before SyncBase finishes?")
|
||||||
}
|
}
|
||||||
|
return s.c.Watch(ctx, s.prefix, clientv3.WithPrefix(), clientv3.WithRev(s.rev))
|
||||||
respchan := make(chan clientv3.WatchResponse, 1024)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
wapi := clientv3.NewWatcher(s.c)
|
|
||||||
defer wapi.Close()
|
|
||||||
defer close(respchan)
|
|
||||||
|
|
||||||
// get all events since revision (or get non-compacted revision, if
|
|
||||||
// rev is too far behind)
|
|
||||||
wch := wapi.Watch(ctx, s.prefix, clientv3.WithPrefix(), clientv3.WithRev(s.rev))
|
|
||||||
for wr := range wch {
|
|
||||||
respchan <- wr
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return respchan
|
|
||||||
}
|
}
|
||||||
|
@ -24,32 +24,31 @@ import (
|
|||||||
// release all blocked processes.
|
// release all blocked processes.
|
||||||
type Barrier struct {
|
type Barrier struct {
|
||||||
client *v3.Client
|
client *v3.Client
|
||||||
kv v3.KV
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
|
||||||
key string
|
key string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBarrier(client *v3.Client, key string) *Barrier {
|
func NewBarrier(client *v3.Client, key string) *Barrier {
|
||||||
return &Barrier{client, v3.NewKV(client), context.TODO(), key}
|
return &Barrier{client, context.TODO(), key}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Hold creates the barrier key causing processes to block on Wait.
|
// Hold creates the barrier key causing processes to block on Wait.
|
||||||
func (b *Barrier) Hold() error {
|
func (b *Barrier) Hold() error {
|
||||||
_, err := NewKey(b.kv, b.key, 0)
|
_, err := NewKey(b.client, b.key, 0)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Release deletes the barrier key to unblock all waiting processes.
|
// Release deletes the barrier key to unblock all waiting processes.
|
||||||
func (b *Barrier) Release() error {
|
func (b *Barrier) Release() error {
|
||||||
_, err := b.kv.Delete(b.ctx, b.key)
|
_, err := b.client.Delete(b.ctx, b.key)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait blocks on the barrier key until it is deleted. If there is no key, Wait
|
// Wait blocks on the barrier key until it is deleted. If there is no key, Wait
|
||||||
// assumes Release has already been called and returns immediately.
|
// assumes Release has already been called and returns immediately.
|
||||||
func (b *Barrier) Wait() error {
|
func (b *Barrier) Wait() error {
|
||||||
resp, err := b.kv.Get(b.ctx, b.key, v3.WithFirstKey()...)
|
resp, err := b.client.Get(b.ctx, b.key, v3.WithFirstKey()...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -24,7 +24,6 @@ import (
|
|||||||
// blocks again on Leave until all processes have left.
|
// blocks again on Leave until all processes have left.
|
||||||
type DoubleBarrier struct {
|
type DoubleBarrier struct {
|
||||||
client *clientv3.Client
|
client *clientv3.Client
|
||||||
kv clientv3.KV
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
|
||||||
key string // key for the collective barrier
|
key string // key for the collective barrier
|
||||||
@ -35,7 +34,6 @@ type DoubleBarrier struct {
|
|||||||
func NewDoubleBarrier(client *clientv3.Client, key string, count int) *DoubleBarrier {
|
func NewDoubleBarrier(client *clientv3.Client, key string, count int) *DoubleBarrier {
|
||||||
return &DoubleBarrier{
|
return &DoubleBarrier{
|
||||||
client: client,
|
client: client,
|
||||||
kv: clientv3.NewKV(client),
|
|
||||||
ctx: context.TODO(),
|
ctx: context.TODO(),
|
||||||
key: key,
|
key: key,
|
||||||
count: count,
|
count: count,
|
||||||
@ -50,7 +48,7 @@ func (b *DoubleBarrier) Enter() error {
|
|||||||
}
|
}
|
||||||
b.myKey = ek
|
b.myKey = ek
|
||||||
|
|
||||||
resp, err := b.kv.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
|
resp, err := b.client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -61,7 +59,7 @@ func (b *DoubleBarrier) Enter() error {
|
|||||||
|
|
||||||
if len(resp.Kvs) == b.count {
|
if len(resp.Kvs) == b.count {
|
||||||
// unblock waiters
|
// unblock waiters
|
||||||
_, err = b.kv.Put(b.ctx, b.key+"/ready", "")
|
_, err = b.client.Put(b.ctx, b.key+"/ready", "")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -75,7 +73,7 @@ func (b *DoubleBarrier) Enter() error {
|
|||||||
|
|
||||||
// Leave waits for "count" processes to leave the barrier then returns
|
// Leave waits for "count" processes to leave the barrier then returns
|
||||||
func (b *DoubleBarrier) Leave() error {
|
func (b *DoubleBarrier) Leave() error {
|
||||||
resp, err := b.kv.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
|
resp, err := b.client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
|
||||||
if len(resp.Kvs) == 0 {
|
if len(resp.Kvs) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -93,7 +91,7 @@ func (b *DoubleBarrier) Leave() error {
|
|||||||
|
|
||||||
if len(resp.Kvs) == 1 {
|
if len(resp.Kvs) == 1 {
|
||||||
// this is the only node in the barrier; finish up
|
// this is the only node in the barrier; finish up
|
||||||
if _, err = b.kv.Delete(b.ctx, b.key+"/ready"); err != nil {
|
if _, err = b.client.Delete(b.ctx, b.key+"/ready"); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return b.myKey.Delete()
|
return b.myKey.Delete()
|
||||||
|
@ -22,7 +22,6 @@ import (
|
|||||||
|
|
||||||
type Election struct {
|
type Election struct {
|
||||||
client *v3.Client
|
client *v3.Client
|
||||||
kv v3.KV
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
|
||||||
keyPrefix string
|
keyPrefix string
|
||||||
@ -31,7 +30,7 @@ type Election struct {
|
|||||||
|
|
||||||
// NewElection returns a new election on a given key prefix.
|
// NewElection returns a new election on a given key prefix.
|
||||||
func NewElection(client *v3.Client, keyPrefix string) *Election {
|
func NewElection(client *v3.Client, keyPrefix string) *Election {
|
||||||
return &Election{client, v3.NewKV(client), context.TODO(), keyPrefix, nil}
|
return &Election{client, context.TODO(), keyPrefix, nil}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Volunteer puts a value as eligible for the election. It blocks until
|
// Volunteer puts a value as eligible for the election. It blocks until
|
||||||
@ -62,7 +61,7 @@ func (e *Election) Resign() (err error) {
|
|||||||
|
|
||||||
// Leader returns the leader value for the current election.
|
// Leader returns the leader value for the current election.
|
||||||
func (e *Election) Leader() (string, error) {
|
func (e *Election) Leader() (string, error) {
|
||||||
resp, err := e.kv.Get(e.ctx, e.keyPrefix, v3.WithFirstCreate()...)
|
resp, err := e.client.Get(e.ctx, e.keyPrefix, v3.WithFirstCreate()...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
} else if len(resp.Kvs) == 0 {
|
} else if len(resp.Kvs) == 0 {
|
||||||
@ -74,7 +73,7 @@ func (e *Election) Leader() (string, error) {
|
|||||||
|
|
||||||
// Wait waits for a leader to be elected, returning the leader value.
|
// Wait waits for a leader to be elected, returning the leader value.
|
||||||
func (e *Election) Wait() (string, error) {
|
func (e *Election) Wait() (string, error) {
|
||||||
resp, err := e.kv.Get(e.ctx, e.keyPrefix, v3.WithFirstCreate()...)
|
resp, err := e.client.Get(e.ctx, e.keyPrefix, v3.WithFirstCreate()...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
} else if len(resp.Kvs) != 0 {
|
} else if len(resp.Kvs) != 0 {
|
||||||
@ -94,7 +93,7 @@ func (e *Election) Wait() (string, error) {
|
|||||||
|
|
||||||
func (e *Election) waitLeadership(tryKey *EphemeralKV) error {
|
func (e *Election) waitLeadership(tryKey *EphemeralKV) error {
|
||||||
opts := append(v3.WithLastCreate(), v3.WithRev(tryKey.Revision()-1))
|
opts := append(v3.WithLastCreate(), v3.WithRev(tryKey.Revision()-1))
|
||||||
resp, err := e.kv.Get(e.ctx, e.keyPrefix, opts...)
|
resp, err := e.client.Get(e.ctx, e.keyPrefix, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
} else if len(resp.Kvs) == 0 {
|
} else if len(resp.Kvs) == 0 {
|
||||||
|
@ -166,7 +166,7 @@ func NewEphemeralKV(client *v3.Client, key, val string) (*EphemeralKV, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
k, err := NewKV(v3.NewKV(client), key, val, s.Lease())
|
k, err := NewKV(client, key, val, s.Lease())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -25,20 +25,19 @@ import (
|
|||||||
// PriorityQueue implements a multi-reader, multi-writer distributed queue.
|
// PriorityQueue implements a multi-reader, multi-writer distributed queue.
|
||||||
type PriorityQueue struct {
|
type PriorityQueue struct {
|
||||||
client *v3.Client
|
client *v3.Client
|
||||||
kv v3.KV
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
key string
|
key string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPriorityQueue creates an etcd priority queue.
|
// NewPriorityQueue creates an etcd priority queue.
|
||||||
func NewPriorityQueue(client *v3.Client, key string) *PriorityQueue {
|
func NewPriorityQueue(client *v3.Client, key string) *PriorityQueue {
|
||||||
return &PriorityQueue{client, v3.NewKV(client), context.TODO(), key + "/"}
|
return &PriorityQueue{client, context.TODO(), key + "/"}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Enqueue puts a value into a queue with a given priority.
|
// Enqueue puts a value into a queue with a given priority.
|
||||||
func (q *PriorityQueue) Enqueue(val string, pr uint16) error {
|
func (q *PriorityQueue) Enqueue(val string, pr uint16) error {
|
||||||
prefix := fmt.Sprintf("%s%05d", q.key, pr)
|
prefix := fmt.Sprintf("%s%05d", q.key, pr)
|
||||||
_, err := NewSequentialKV(q.kv, prefix, val)
|
_, err := NewSequentialKV(q.client, prefix, val)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -46,12 +45,12 @@ func (q *PriorityQueue) Enqueue(val string, pr uint16) error {
|
|||||||
// queue is empty, Dequeue blocks until items are available.
|
// queue is empty, Dequeue blocks until items are available.
|
||||||
func (q *PriorityQueue) Dequeue() (string, error) {
|
func (q *PriorityQueue) Dequeue() (string, error) {
|
||||||
// TODO: fewer round trips by fetching more than one key
|
// TODO: fewer round trips by fetching more than one key
|
||||||
resp, err := q.kv.Get(q.ctx, q.key, v3.WithFirstKey()...)
|
resp, err := q.client.Get(q.ctx, q.key, v3.WithFirstKey()...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
kv, err := claimFirstKey(q.kv, resp.Kvs)
|
kv, err := claimFirstKey(q.client, resp.Kvs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
} else if kv != nil {
|
} else if kv != nil {
|
||||||
@ -71,7 +70,7 @@ func (q *PriorityQueue) Dequeue() (string, error) {
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
ok, err := deleteRevKey(q.kv, string(ev.Kv.Key), ev.Kv.ModRevision)
|
ok, err := deleteRevKey(q.client, string(ev.Kv.Key), ev.Kv.ModRevision)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
} else if !ok {
|
} else if !ok {
|
||||||
|
@ -23,18 +23,17 @@ import (
|
|||||||
// Queue implements a multi-reader, multi-writer distributed queue.
|
// Queue implements a multi-reader, multi-writer distributed queue.
|
||||||
type Queue struct {
|
type Queue struct {
|
||||||
client *v3.Client
|
client *v3.Client
|
||||||
kv v3.KV
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
|
||||||
keyPrefix string
|
keyPrefix string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewQueue(client *v3.Client, keyPrefix string) *Queue {
|
func NewQueue(client *v3.Client, keyPrefix string) *Queue {
|
||||||
return &Queue{client, v3.NewKV(client), context.TODO(), keyPrefix}
|
return &Queue{client, context.TODO(), keyPrefix}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *Queue) Enqueue(val string) error {
|
func (q *Queue) Enqueue(val string) error {
|
||||||
_, err := NewUniqueKV(q.kv, q.keyPrefix, val, 0)
|
_, err := NewUniqueKV(q.client, q.keyPrefix, val, 0)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -42,12 +41,12 @@ func (q *Queue) Enqueue(val string) error {
|
|||||||
// queue is empty, Dequeue blocks until elements are available.
|
// queue is empty, Dequeue blocks until elements are available.
|
||||||
func (q *Queue) Dequeue() (string, error) {
|
func (q *Queue) Dequeue() (string, error) {
|
||||||
// TODO: fewer round trips by fetching more than one key
|
// TODO: fewer round trips by fetching more than one key
|
||||||
resp, err := q.kv.Get(q.ctx, q.keyPrefix, v3.WithFirstRev()...)
|
resp, err := q.client.Get(q.ctx, q.keyPrefix, v3.WithFirstRev()...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
kv, err := claimFirstKey(q.kv, resp.Kvs)
|
kv, err := claimFirstKey(q.client, resp.Kvs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
} else if kv != nil {
|
} else if kv != nil {
|
||||||
@ -67,7 +66,7 @@ func (q *Queue) Dequeue() (string, error) {
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
ok, err := deleteRevKey(q.kv, string(ev.Kv.Key), ev.Kv.ModRevision)
|
ok, err := deleteRevKey(q.client, string(ev.Kv.Key), ev.Kv.ModRevision)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
} else if !ok {
|
} else if !ok {
|
||||||
|
@ -22,7 +22,6 @@ import (
|
|||||||
|
|
||||||
type RWMutex struct {
|
type RWMutex struct {
|
||||||
client *v3.Client
|
client *v3.Client
|
||||||
kv v3.KV
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
|
||||||
key string
|
key string
|
||||||
@ -30,7 +29,7 @@ type RWMutex struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewRWMutex(client *v3.Client, key string) *RWMutex {
|
func NewRWMutex(client *v3.Client, key string) *RWMutex {
|
||||||
return &RWMutex{client, v3.NewKV(client), context.TODO(), key, nil}
|
return &RWMutex{client, context.TODO(), key, nil}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rwm *RWMutex) RLock() error {
|
func (rwm *RWMutex) RLock() error {
|
||||||
@ -42,7 +41,7 @@ func (rwm *RWMutex) RLock() error {
|
|||||||
|
|
||||||
// if there are nodes with "write-" and a lower
|
// if there are nodes with "write-" and a lower
|
||||||
// revision number than us we must wait
|
// revision number than us we must wait
|
||||||
resp, err := rwm.kv.Get(rwm.ctx, rwm.key+"/write", v3.WithFirstRev()...)
|
resp, err := rwm.client.Get(rwm.ctx, rwm.key+"/write", v3.WithFirstRev()...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -63,7 +62,7 @@ func (rwm *RWMutex) Lock() error {
|
|||||||
for {
|
for {
|
||||||
// find any key of lower rev number blocks the write lock
|
// find any key of lower rev number blocks the write lock
|
||||||
opts := append(v3.WithLastRev(), v3.WithRev(rk.Revision()-1))
|
opts := append(v3.WithLastRev(), v3.WithRev(rk.Revision()-1))
|
||||||
resp, err := rwm.kv.Get(rwm.ctx, rwm.key, opts...)
|
resp, err := rwm.client.Get(rwm.ctx, rwm.key, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -83,7 +82,7 @@ func (rwm *RWMutex) Lock() error {
|
|||||||
func (rwm *RWMutex) waitOnLowest() error {
|
func (rwm *RWMutex) waitOnLowest() error {
|
||||||
// must block; get key before ek for waiting
|
// must block; get key before ek for waiting
|
||||||
opts := append(v3.WithLastRev(), v3.WithRev(rwm.myKey.Revision()-1))
|
opts := append(v3.WithLastRev(), v3.WithRev(rwm.myKey.Revision()-1))
|
||||||
lastKey, err := rwm.kv.Get(rwm.ctx, rwm.key, opts...)
|
lastKey, err := rwm.client.Get(rwm.ctx, rwm.key, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -22,7 +22,6 @@ import (
|
|||||||
// STM implements software transactional memory over etcd
|
// STM implements software transactional memory over etcd
|
||||||
type STM struct {
|
type STM struct {
|
||||||
client *v3.Client
|
client *v3.Client
|
||||||
kv v3.KV
|
|
||||||
// rset holds the read key's value and revision of read
|
// rset holds the read key's value and revision of read
|
||||||
rset map[string]*RemoteKV
|
rset map[string]*RemoteKV
|
||||||
// wset holds the write key and its value
|
// wset holds the write key and its value
|
||||||
@ -34,7 +33,7 @@ type STM struct {
|
|||||||
|
|
||||||
// NewSTM creates new transaction loop for a given apply function.
|
// NewSTM creates new transaction loop for a given apply function.
|
||||||
func NewSTM(client *v3.Client, apply func(*STM) error) <-chan error {
|
func NewSTM(client *v3.Client, apply func(*STM) error) <-chan error {
|
||||||
s := &STM{client: client, kv: v3.NewKV(client), apply: apply}
|
s := &STM{client: client, apply: apply}
|
||||||
errc := make(chan error, 1)
|
errc := make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
var err error
|
var err error
|
||||||
@ -64,7 +63,7 @@ func (s *STM) Get(key string) (string, error) {
|
|||||||
if rk, ok := s.rset[key]; ok {
|
if rk, ok := s.rset[key]; ok {
|
||||||
return rk.Value(), nil
|
return rk.Value(), nil
|
||||||
}
|
}
|
||||||
rk, err := GetRemoteKV(s.kv, key)
|
rk, err := GetRemoteKV(s.client, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@ -91,7 +90,7 @@ func (s *STM) commit() (ok bool, rr error) {
|
|||||||
for k, v := range s.wset {
|
for k, v := range s.wset {
|
||||||
puts = append(puts, v3.OpPut(k, v))
|
puts = append(puts, v3.OpPut(k, v))
|
||||||
}
|
}
|
||||||
txnresp, err := s.kv.Txn(context.TODO()).If(cmps...).Then(puts...).Commit()
|
txnresp, err := s.client.Txn(context.TODO()).If(cmps...).Then(puts...).Commit()
|
||||||
return txnresp.Succeeded, err
|
return txnresp.Succeeded, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -22,23 +22,19 @@ import (
|
|||||||
|
|
||||||
// WaitEvents waits on a key until it observes the given events and returns the final one.
|
// WaitEvents waits on a key until it observes the given events and returns the final one.
|
||||||
func WaitEvents(c *clientv3.Client, key string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) {
|
func WaitEvents(c *clientv3.Client, key string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) {
|
||||||
w := clientv3.NewWatcher(c)
|
wc := c.Watch(context.Background(), key, clientv3.WithRev(rev))
|
||||||
wc := w.Watch(context.Background(), key, clientv3.WithRev(rev))
|
|
||||||
if wc == nil {
|
if wc == nil {
|
||||||
w.Close()
|
|
||||||
return nil, ErrNoWatcher
|
return nil, ErrNoWatcher
|
||||||
}
|
}
|
||||||
return waitEvents(wc, evs), w.Close()
|
return waitEvents(wc, evs), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func WaitPrefixEvents(c *clientv3.Client, prefix string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) {
|
func WaitPrefixEvents(c *clientv3.Client, prefix string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) {
|
||||||
w := clientv3.NewWatcher(c)
|
wc := c.Watch(context.Background(), prefix, clientv3.WithPrefix(), clientv3.WithRev(rev))
|
||||||
wc := w.Watch(context.Background(), prefix, clientv3.WithPrefix(), clientv3.WithRev(rev))
|
|
||||||
if wc == nil {
|
if wc == nil {
|
||||||
w.Close()
|
|
||||||
return nil, ErrNoWatcher
|
return nil, ErrNoWatcher
|
||||||
}
|
}
|
||||||
return waitEvents(wc, evs), w.Close()
|
return waitEvents(wc, evs), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func waitEvents(wc clientv3.WatchChan, evs []storagepb.Event_EventType) *storagepb.Event {
|
func waitEvents(wc clientv3.WatchChan, evs []storagepb.Event_EventType) *storagepb.Event {
|
||||||
|
@ -20,7 +20,6 @@ import (
|
|||||||
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
"github.com/coreos/etcd/clientv3"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewCompactionCommand returns the cobra command for "compaction".
|
// NewCompactionCommand returns the cobra command for "compaction".
|
||||||
@ -44,7 +43,7 @@ func compactionCommandFunc(cmd *cobra.Command, args []string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
c := mustClientFromCmd(cmd)
|
c := mustClientFromCmd(cmd)
|
||||||
if cerr := clientv3.NewKV(c).Compact(context.TODO(), rev); cerr != nil {
|
if cerr := c.Compact(context.TODO(), rev); cerr != nil {
|
||||||
ExitWithError(ExitError, cerr)
|
ExitWithError(ExitError, cerr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -34,9 +34,7 @@ func NewDelCommand() *cobra.Command {
|
|||||||
// delCommandFunc executes the "del" command.
|
// delCommandFunc executes the "del" command.
|
||||||
func delCommandFunc(cmd *cobra.Command, args []string) {
|
func delCommandFunc(cmd *cobra.Command, args []string) {
|
||||||
key, opts := getDelOp(cmd, args)
|
key, opts := getDelOp(cmd, args)
|
||||||
c := mustClientFromCmd(cmd)
|
resp, err := mustClientFromCmd(cmd).Delete(context.TODO(), key, opts...)
|
||||||
kvapi := clientv3.NewKV(c)
|
|
||||||
resp, err := kvapi.Delete(context.TODO(), key, opts...)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ExitWithError(ExitError, err)
|
ExitWithError(ExitError, err)
|
||||||
}
|
}
|
||||||
|
@ -49,9 +49,7 @@ func NewGetCommand() *cobra.Command {
|
|||||||
// getCommandFunc executes the "get" command.
|
// getCommandFunc executes the "get" command.
|
||||||
func getCommandFunc(cmd *cobra.Command, args []string) {
|
func getCommandFunc(cmd *cobra.Command, args []string) {
|
||||||
key, opts := getGetOp(cmd, args)
|
key, opts := getGetOp(cmd, args)
|
||||||
c := mustClientFromCmd(cmd)
|
resp, err := mustClientFromCmd(cmd).Get(context.TODO(), key, opts...)
|
||||||
kvapi := clientv3.NewKV(c)
|
|
||||||
resp, err := kvapi.Get(context.TODO(), key, opts...)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ExitWithError(ExitError, err)
|
ExitWithError(ExitError, err)
|
||||||
}
|
}
|
||||||
|
@ -21,7 +21,6 @@ import (
|
|||||||
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
"github.com/coreos/etcd/clientv3"
|
|
||||||
"github.com/coreos/etcd/lease"
|
"github.com/coreos/etcd/lease"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -62,9 +61,7 @@ func leaseCreateCommandFunc(cmd *cobra.Command, args []string) {
|
|||||||
ExitWithError(ExitBadArgs, fmt.Errorf("bad TTL (%v)", err))
|
ExitWithError(ExitBadArgs, fmt.Errorf("bad TTL (%v)", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
c := mustClientFromCmd(cmd)
|
resp, err := mustClientFromCmd(cmd).Create(context.TODO(), ttl)
|
||||||
l := clientv3.NewLease(c)
|
|
||||||
resp, err := l.Create(context.TODO(), ttl)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Fprintf(os.Stderr, "failed to create lease (%v)\n", err)
|
fmt.Fprintf(os.Stderr, "failed to create lease (%v)\n", err)
|
||||||
return
|
return
|
||||||
@ -95,9 +92,7 @@ func leaseRevokeCommandFunc(cmd *cobra.Command, args []string) {
|
|||||||
ExitWithError(ExitBadArgs, fmt.Errorf("bad lease ID arg (%v), expecting ID in Hex", err))
|
ExitWithError(ExitBadArgs, fmt.Errorf("bad lease ID arg (%v), expecting ID in Hex", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
c := mustClientFromCmd(cmd)
|
_, err = mustClientFromCmd(cmd).Revoke(context.TODO(), lease.LeaseID(id))
|
||||||
l := clientv3.NewLease(c)
|
|
||||||
_, err = l.Revoke(context.TODO(), lease.LeaseID(id))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Fprintf(os.Stderr, "failed to revoke lease (%v)\n", err)
|
fmt.Fprintf(os.Stderr, "failed to revoke lease (%v)\n", err)
|
||||||
return
|
return
|
||||||
@ -128,9 +123,7 @@ func leaseKeepAliveCommandFunc(cmd *cobra.Command, args []string) {
|
|||||||
ExitWithError(ExitBadArgs, fmt.Errorf("bad lease ID arg (%v), expecting ID in Hex", err))
|
ExitWithError(ExitBadArgs, fmt.Errorf("bad lease ID arg (%v), expecting ID in Hex", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
c := mustClientFromCmd(cmd)
|
respc, kerr := mustClientFromCmd(cmd).KeepAlive(context.TODO(), lease.LeaseID(id))
|
||||||
l := clientv3.NewLease(c)
|
|
||||||
respc, kerr := l.KeepAlive(context.TODO(), lease.LeaseID(id))
|
|
||||||
if kerr != nil {
|
if kerr != nil {
|
||||||
ExitWithError(ExitBadConnection, kerr)
|
ExitWithError(ExitBadConnection, kerr)
|
||||||
}
|
}
|
||||||
|
@ -68,7 +68,7 @@ func lockUntilSignal(c *clientv3.Client, lockname string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
k, kerr := clientv3.NewKV(c).Get(ctx, m.Key())
|
k, kerr := c.Get(ctx, m.Key())
|
||||||
if kerr != nil {
|
if kerr != nil {
|
||||||
return kerr
|
return kerr
|
||||||
}
|
}
|
||||||
|
@ -75,15 +75,13 @@ func makeMirror(ctx context.Context, c *clientv3.Client, dc *clientv3.Client) er
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
// TODO: remove the prefix of the destination cluster?
|
// TODO: remove the prefix of the destination cluster?
|
||||||
dkv := clientv3.NewKV(dc)
|
|
||||||
|
|
||||||
s := mirror.NewSyncer(c, mmprefix, 0)
|
s := mirror.NewSyncer(c, mmprefix, 0)
|
||||||
|
|
||||||
rc, errc := s.SyncBase(ctx)
|
rc, errc := s.SyncBase(ctx)
|
||||||
|
|
||||||
for r := range rc {
|
for r := range rc {
|
||||||
for _, kv := range r.Kvs {
|
for _, kv := range r.Kvs {
|
||||||
_, err := dkv.Put(ctx, string(kv.Key), string(kv.Value))
|
_, err := dc.Put(ctx, string(kv.Key), string(kv.Value))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -109,7 +107,7 @@ func makeMirror(ctx context.Context, c *clientv3.Client, dc *clientv3.Client) er
|
|||||||
for _, ev := range wr.Events {
|
for _, ev := range wr.Events {
|
||||||
nrev := ev.Kv.ModRevision
|
nrev := ev.Kv.ModRevision
|
||||||
if rev != 0 && nrev > rev {
|
if rev != 0 && nrev > rev {
|
||||||
_, err := dkv.Txn(ctx).Then(ops...).Commit()
|
_, err := dc.Txn(ctx).Then(ops...).Commit()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -128,7 +126,7 @@ func makeMirror(ctx context.Context, c *clientv3.Client, dc *clientv3.Client) er
|
|||||||
}
|
}
|
||||||
|
|
||||||
if len(ops) != 0 {
|
if len(ops) != 0 {
|
||||||
_, err := dkv.Txn(ctx).Then(ops...).Commit()
|
_, err := dc.Txn(ctx).Then(ops...).Commit()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -21,7 +21,6 @@ import (
|
|||||||
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -108,8 +107,7 @@ func memberAddCommandFunc(cmd *cobra.Command, args []string) {
|
|||||||
|
|
||||||
urls := strings.Split(memberPeerURLs, ",")
|
urls := strings.Split(memberPeerURLs, ",")
|
||||||
|
|
||||||
req := &pb.MemberAddRequest{PeerURLs: urls}
|
resp, err := mustClientFromCmd(cmd).MemberAdd(context.TODO(), urls)
|
||||||
resp, err := mustClientFromCmd(cmd).Cluster.MemberAdd(context.TODO(), req)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ExitWithError(ExitError, err)
|
ExitWithError(ExitError, err)
|
||||||
}
|
}
|
||||||
@ -128,8 +126,7 @@ func memberRemoveCommandFunc(cmd *cobra.Command, args []string) {
|
|||||||
ExitWithError(ExitBadArgs, fmt.Errorf("bad member ID arg (%v), expecting ID in Hex", err))
|
ExitWithError(ExitBadArgs, fmt.Errorf("bad member ID arg (%v), expecting ID in Hex", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
req := &pb.MemberRemoveRequest{ID: uint64(id)}
|
resp, err := mustClientFromCmd(cmd).MemberRemove(context.TODO(), id)
|
||||||
resp, err := mustClientFromCmd(cmd).Cluster.MemberRemove(context.TODO(), req)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ExitWithError(ExitError, err)
|
ExitWithError(ExitError, err)
|
||||||
}
|
}
|
||||||
@ -154,8 +151,7 @@ func memberUpdateCommandFunc(cmd *cobra.Command, args []string) {
|
|||||||
|
|
||||||
urls := strings.Split(memberPeerURLs, ",")
|
urls := strings.Split(memberPeerURLs, ",")
|
||||||
|
|
||||||
req := &pb.MemberUpdateRequest{ID: uint64(id), PeerURLs: urls}
|
resp, err := mustClientFromCmd(cmd).MemberUpdate(context.TODO(), id, urls)
|
||||||
resp, err := mustClientFromCmd(cmd).Cluster.MemberUpdate(context.TODO(), req)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ExitWithError(ExitError, err)
|
ExitWithError(ExitError, err)
|
||||||
}
|
}
|
||||||
@ -165,7 +161,7 @@ func memberUpdateCommandFunc(cmd *cobra.Command, args []string) {
|
|||||||
|
|
||||||
// memberListCommandFunc executes the "member list" command.
|
// memberListCommandFunc executes the "member list" command.
|
||||||
func memberListCommandFunc(cmd *cobra.Command, args []string) {
|
func memberListCommandFunc(cmd *cobra.Command, args []string) {
|
||||||
resp, err := mustClientFromCmd(cmd).Cluster.MemberList(context.TODO(), &pb.MemberListRequest{})
|
resp, err := mustClientFromCmd(cmd).MemberList(context.TODO())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ExitWithError(ExitError, err)
|
ExitWithError(ExitError, err)
|
||||||
}
|
}
|
||||||
|
@ -58,9 +58,7 @@ will store the content of the file to <key>.
|
|||||||
func putCommandFunc(cmd *cobra.Command, args []string) {
|
func putCommandFunc(cmd *cobra.Command, args []string) {
|
||||||
key, value, opts := getPutOp(cmd, args)
|
key, value, opts := getPutOp(cmd, args)
|
||||||
|
|
||||||
c := mustClientFromCmd(cmd)
|
resp, err := mustClientFromCmd(cmd).Put(context.TODO(), key, value, opts...)
|
||||||
kvapi := clientv3.NewKV(c)
|
|
||||||
resp, err := kvapi.Put(context.TODO(), key, value, opts...)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ExitWithError(ExitError, err)
|
ExitWithError(ExitError, err)
|
||||||
}
|
}
|
||||||
|
@ -52,9 +52,7 @@ func snapshotCommandFunc(cmd *cobra.Command, args []string) {
|
|||||||
// snapshotToStdout streams a snapshot over stdout
|
// snapshotToStdout streams a snapshot over stdout
|
||||||
func snapshotToStdout(c *clientv3.Client) {
|
func snapshotToStdout(c *clientv3.Client) {
|
||||||
// must explicitly fetch first revision since no retry on stdout
|
// must explicitly fetch first revision since no retry on stdout
|
||||||
wapi := clientv3.NewWatcher(c)
|
wr := <-c.Watch(context.TODO(), "", clientv3.WithPrefix(), clientv3.WithRev(1))
|
||||||
defer wapi.Close()
|
|
||||||
wr := <-wapi.Watch(context.TODO(), "", clientv3.WithPrefix(), clientv3.WithRev(1))
|
|
||||||
if len(wr.Events) > 0 {
|
if len(wr.Events) > 0 {
|
||||||
wr.CompactRevision = 1
|
wr.CompactRevision = 1
|
||||||
}
|
}
|
||||||
|
@ -53,7 +53,7 @@ func txnCommandFunc(cmd *cobra.Command, args []string) {
|
|||||||
|
|
||||||
reader := bufio.NewReader(os.Stdin)
|
reader := bufio.NewReader(os.Stdin)
|
||||||
|
|
||||||
txn := clientv3.NewKV(mustClientFromCmd(cmd)).Txn(context.Background())
|
txn := mustClientFromCmd(cmd).Txn(context.Background())
|
||||||
fmt.Println("compares:")
|
fmt.Println("compares:")
|
||||||
txn.If(readCompares(reader)...)
|
txn.If(readCompares(reader)...)
|
||||||
fmt.Println("success requests (get, put, delete):")
|
fmt.Println("success requests (get, put, delete):")
|
||||||
|
@ -57,16 +57,14 @@ func watchCommandFunc(cmd *cobra.Command, args []string) {
|
|||||||
ExitWithError(ExitBadArgs, fmt.Errorf("watch in non-interactive mode requires an argument as key or prefix"))
|
ExitWithError(ExitBadArgs, fmt.Errorf("watch in non-interactive mode requires an argument as key or prefix"))
|
||||||
}
|
}
|
||||||
|
|
||||||
c := mustClientFromCmd(cmd)
|
|
||||||
w := clientv3.NewWatcher(c)
|
|
||||||
|
|
||||||
opts := []clientv3.OpOption{clientv3.WithRev(watchRev)}
|
opts := []clientv3.OpOption{clientv3.WithRev(watchRev)}
|
||||||
if watchPrefix {
|
if watchPrefix {
|
||||||
opts = append(opts, clientv3.WithPrefix())
|
opts = append(opts, clientv3.WithPrefix())
|
||||||
}
|
}
|
||||||
wc := w.Watch(context.TODO(), args[0], opts...)
|
c := mustClientFromCmd(cmd)
|
||||||
|
wc := c.Watch(context.TODO(), args[0], opts...)
|
||||||
printWatchCh(wc)
|
printWatchCh(wc)
|
||||||
err := w.Close()
|
err := c.Close()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
ExitWithError(ExitInterrupted, fmt.Errorf("watch is canceled by the server"))
|
ExitWithError(ExitInterrupted, fmt.Errorf("watch is canceled by the server"))
|
||||||
}
|
}
|
||||||
@ -75,7 +73,6 @@ func watchCommandFunc(cmd *cobra.Command, args []string) {
|
|||||||
|
|
||||||
func watchInteractiveFunc(cmd *cobra.Command, args []string) {
|
func watchInteractiveFunc(cmd *cobra.Command, args []string) {
|
||||||
c := mustClientFromCmd(cmd)
|
c := mustClientFromCmd(cmd)
|
||||||
w := clientv3.NewWatcher(c)
|
|
||||||
|
|
||||||
reader := bufio.NewReader(os.Stdin)
|
reader := bufio.NewReader(os.Stdin)
|
||||||
|
|
||||||
@ -117,7 +114,7 @@ func watchInteractiveFunc(cmd *cobra.Command, args []string) {
|
|||||||
if watchPrefix {
|
if watchPrefix {
|
||||||
opts = append(opts, clientv3.WithPrefix())
|
opts = append(opts, clientv3.WithPrefix())
|
||||||
}
|
}
|
||||||
ch := w.Watch(context.TODO(), key, opts...)
|
ch := c.Watch(context.TODO(), key, opts...)
|
||||||
go printWatchCh(ch)
|
go printWatchCh(ch)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -37,6 +37,7 @@ import (
|
|||||||
"github.com/coreos/etcd/etcdserver"
|
"github.com/coreos/etcd/etcdserver"
|
||||||
"github.com/coreos/etcd/etcdserver/api/v3rpc"
|
"github.com/coreos/etcd/etcdserver/api/v3rpc"
|
||||||
"github.com/coreos/etcd/etcdserver/etcdhttp"
|
"github.com/coreos/etcd/etcdserver/etcdhttp"
|
||||||
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||||
"github.com/coreos/etcd/pkg/testutil"
|
"github.com/coreos/etcd/pkg/testutil"
|
||||||
"github.com/coreos/etcd/pkg/transport"
|
"github.com/coreos/etcd/pkg/transport"
|
||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
@ -733,3 +734,23 @@ func (c *ClusterV3) RandClient() *clientv3.Client {
|
|||||||
func (c *ClusterV3) Client(i int) *clientv3.Client {
|
func (c *ClusterV3) Client(i int) *clientv3.Client {
|
||||||
return c.clients[i]
|
return c.clients[i]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type grpcAPI struct {
|
||||||
|
// Cluster is the cluster API for the client's connection.
|
||||||
|
Cluster pb.ClusterClient
|
||||||
|
// KV is the keyvalue API for the client's connection.
|
||||||
|
KV pb.KVClient
|
||||||
|
// Lease is the lease API for the client's connection.
|
||||||
|
Lease pb.LeaseClient
|
||||||
|
// Watch is the watch API for the client's connection.
|
||||||
|
Watch pb.WatchClient
|
||||||
|
}
|
||||||
|
|
||||||
|
func toGRPC(c *clientv3.Client) grpcAPI {
|
||||||
|
return grpcAPI{
|
||||||
|
pb.NewClusterClient(c.ActiveConnection()),
|
||||||
|
pb.NewKVClient(c.ActiveConnection()),
|
||||||
|
pb.NewLeaseClient(c.ActiveConnection()),
|
||||||
|
pb.NewWatchClient(c.ActiveConnection()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -33,7 +33,7 @@ func TestV3PutOverwrite(t *testing.T) {
|
|||||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||||
defer clus.Terminate(t)
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
kvc := clus.RandClient().KV
|
kvc := toGRPC(clus.RandClient()).KV
|
||||||
key := []byte("foo")
|
key := []byte("foo")
|
||||||
reqput := &pb.PutRequest{Key: key, Value: []byte("bar")}
|
reqput := &pb.PutRequest{Key: key, Value: []byte("bar")}
|
||||||
|
|
||||||
@ -77,7 +77,7 @@ func TestV3TxnTooManyOps(t *testing.T) {
|
|||||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||||
defer clus.Terminate(t)
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
kvc := clus.RandClient().KV
|
kvc := toGRPC(clus.RandClient()).KV
|
||||||
|
|
||||||
// unique keys
|
// unique keys
|
||||||
i := new(int)
|
i := new(int)
|
||||||
@ -161,7 +161,7 @@ func TestV3TxnDuplicateKeys(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
kvc := clus.RandClient().KV
|
kvc := toGRPC(clus.RandClient()).KV
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
txnSuccess []*pb.RequestUnion
|
txnSuccess []*pb.RequestUnion
|
||||||
|
|
||||||
@ -208,7 +208,7 @@ func TestV3PutMissingLease(t *testing.T) {
|
|||||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||||
defer clus.Terminate(t)
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
kvc := clus.RandClient().KV
|
kvc := toGRPC(clus.RandClient()).KV
|
||||||
key := []byte("foo")
|
key := []byte("foo")
|
||||||
preq := &pb.PutRequest{Key: key, Lease: 123456}
|
preq := &pb.PutRequest{Key: key, Lease: 123456}
|
||||||
tests := []func(){
|
tests := []func(){
|
||||||
@ -324,7 +324,7 @@ func TestV3DeleteRange(t *testing.T) {
|
|||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||||
kvc := clus.RandClient().KV
|
kvc := toGRPC(clus.RandClient()).KV
|
||||||
|
|
||||||
ks := tt.keySet
|
ks := tt.keySet
|
||||||
for j := range ks {
|
for j := range ks {
|
||||||
@ -375,7 +375,7 @@ func TestV3TxnInvaildRange(t *testing.T) {
|
|||||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||||
defer clus.Terminate(t)
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
kvc := clus.RandClient().KV
|
kvc := toGRPC(clus.RandClient()).KV
|
||||||
preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
|
preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
|
||||||
|
|
||||||
for i := 0; i < 3; i++ {
|
for i := 0; i < 3; i++ {
|
||||||
@ -419,7 +419,7 @@ func TestV3TooLargeRequest(t *testing.T) {
|
|||||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||||
defer clus.Terminate(t)
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
kvc := clus.RandClient().KV
|
kvc := toGRPC(clus.RandClient()).KV
|
||||||
|
|
||||||
// 2MB request value
|
// 2MB request value
|
||||||
largeV := make([]byte, 2*1024*1024)
|
largeV := make([]byte, 2*1024*1024)
|
||||||
@ -437,7 +437,7 @@ func TestV3Hash(t *testing.T) {
|
|||||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||||
defer clus.Terminate(t)
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
kvc := clus.RandClient().KV
|
kvc := toGRPC(clus.RandClient()).KV
|
||||||
preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
|
preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
|
||||||
|
|
||||||
for i := 0; i < 3; i++ {
|
for i := 0; i < 3; i++ {
|
||||||
@ -590,7 +590,7 @@ func TestV3RangeRequest(t *testing.T) {
|
|||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||||
for _, k := range tt.putKeys {
|
for _, k := range tt.putKeys {
|
||||||
kvc := clus.RandClient().KV
|
kvc := toGRPC(clus.RandClient()).KV
|
||||||
req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
|
req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
|
||||||
if _, err := kvc.Put(context.TODO(), req); err != nil {
|
if _, err := kvc.Put(context.TODO(), req); err != nil {
|
||||||
t.Fatalf("#%d: couldn't put key (%v)", i, err)
|
t.Fatalf("#%d: couldn't put key (%v)", i, err)
|
||||||
@ -598,7 +598,7 @@ func TestV3RangeRequest(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for j, req := range tt.reqs {
|
for j, req := range tt.reqs {
|
||||||
kvc := clus.RandClient().KV
|
kvc := toGRPC(clus.RandClient()).KV
|
||||||
resp, err := kvc.Range(context.TODO(), &req)
|
resp, err := kvc.Range(context.TODO(), &req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("#%d.%d: Range error: %v", i, j, err)
|
t.Errorf("#%d.%d: Range error: %v", i, j, err)
|
||||||
@ -668,7 +668,7 @@ func TestTLSGRPCRejectInsecureClient(t *testing.T) {
|
|||||||
donec := make(chan error, 1)
|
donec := make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
|
reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
|
||||||
_, perr := client.KV.Put(ctx, reqput)
|
_, perr := toGRPC(client).KV.Put(ctx, reqput)
|
||||||
donec <- perr
|
donec <- perr
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -717,7 +717,7 @@ func TestTLSGRPCAcceptSecureAll(t *testing.T) {
|
|||||||
defer client.Close()
|
defer client.Close()
|
||||||
|
|
||||||
reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
|
reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
|
||||||
if _, err := client.KV.Put(context.TODO(), reqput); err != nil {
|
if _, err := toGRPC(client).KV.Put(context.TODO(), reqput); err != nil {
|
||||||
t.Fatalf("unexpected error on put over tls (%v)", err)
|
t.Fatalf("unexpected error on put over tls (%v)", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -33,7 +33,7 @@ func TestV3LeasePrmote(t *testing.T) {
|
|||||||
defer clus.Terminate(t)
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
// create lease
|
// create lease
|
||||||
lresp, err := clus.RandClient().Lease.LeaseCreate(context.TODO(), &pb.LeaseCreateRequest{TTL: 5})
|
lresp, err := toGRPC(clus.RandClient()).Lease.LeaseCreate(context.TODO(), &pb.LeaseCreateRequest{TTL: 5})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -78,7 +78,7 @@ func TestV3LeasePrmote(t *testing.T) {
|
|||||||
func TestV3LeaseRevoke(t *testing.T) {
|
func TestV3LeaseRevoke(t *testing.T) {
|
||||||
defer testutil.AfterTest(t)
|
defer testutil.AfterTest(t)
|
||||||
testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error {
|
testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error {
|
||||||
lc := clus.RandClient().Lease
|
lc := toGRPC(clus.RandClient()).Lease
|
||||||
_, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID})
|
_, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID})
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
@ -91,7 +91,7 @@ func TestV3LeaseCreateByID(t *testing.T) {
|
|||||||
defer clus.Terminate(t)
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
// create fixed lease
|
// create fixed lease
|
||||||
lresp, err := clus.RandClient().Lease.LeaseCreate(
|
lresp, err := toGRPC(clus.RandClient()).Lease.LeaseCreate(
|
||||||
context.TODO(),
|
context.TODO(),
|
||||||
&pb.LeaseCreateRequest{ID: 1, TTL: 1})
|
&pb.LeaseCreateRequest{ID: 1, TTL: 1})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -102,7 +102,7 @@ func TestV3LeaseCreateByID(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// create duplicate fixed lease
|
// create duplicate fixed lease
|
||||||
lresp, err = clus.RandClient().Lease.LeaseCreate(
|
lresp, err = toGRPC(clus.RandClient()).Lease.LeaseCreate(
|
||||||
context.TODO(),
|
context.TODO(),
|
||||||
&pb.LeaseCreateRequest{ID: 1, TTL: 1})
|
&pb.LeaseCreateRequest{ID: 1, TTL: 1})
|
||||||
if err != v3rpc.ErrLeaseExist {
|
if err != v3rpc.ErrLeaseExist {
|
||||||
@ -110,7 +110,7 @@ func TestV3LeaseCreateByID(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// create fresh fixed lease
|
// create fresh fixed lease
|
||||||
lresp, err = clus.RandClient().Lease.LeaseCreate(
|
lresp, err = toGRPC(clus.RandClient()).Lease.LeaseCreate(
|
||||||
context.TODO(),
|
context.TODO(),
|
||||||
&pb.LeaseCreateRequest{ID: 2, TTL: 1})
|
&pb.LeaseCreateRequest{ID: 2, TTL: 1})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -129,7 +129,7 @@ func TestV3LeaseExpire(t *testing.T) {
|
|||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
wStream, err := clus.RandClient().Watch.Watch(ctx)
|
wStream, err := toGRPC(clus.RandClient()).Watch.Watch(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -177,7 +177,7 @@ func TestV3LeaseExpire(t *testing.T) {
|
|||||||
func TestV3LeaseKeepAlive(t *testing.T) {
|
func TestV3LeaseKeepAlive(t *testing.T) {
|
||||||
defer testutil.AfterTest(t)
|
defer testutil.AfterTest(t)
|
||||||
testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error {
|
testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error {
|
||||||
lc := clus.RandClient().Lease
|
lc := toGRPC(clus.RandClient()).Lease
|
||||||
lreq := &pb.LeaseKeepAliveRequest{ID: leaseID}
|
lreq := &pb.LeaseKeepAliveRequest{ID: leaseID}
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
@ -215,7 +215,7 @@ func TestV3LeaseExists(t *testing.T) {
|
|||||||
// create lease
|
// create lease
|
||||||
ctx0, cancel0 := context.WithCancel(context.Background())
|
ctx0, cancel0 := context.WithCancel(context.Background())
|
||||||
defer cancel0()
|
defer cancel0()
|
||||||
lresp, err := clus.RandClient().Lease.LeaseCreate(
|
lresp, err := toGRPC(clus.RandClient()).Lease.LeaseCreate(
|
||||||
ctx0,
|
ctx0,
|
||||||
&pb.LeaseCreateRequest{TTL: 30})
|
&pb.LeaseCreateRequest{TTL: 30})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -241,34 +241,34 @@ func TestV3LeaseSwitch(t *testing.T) {
|
|||||||
// create lease
|
// create lease
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
lresp1, err1 := clus.RandClient().Lease.LeaseCreate(ctx, &pb.LeaseCreateRequest{TTL: 30})
|
lresp1, err1 := toGRPC(clus.RandClient()).Lease.LeaseCreate(ctx, &pb.LeaseCreateRequest{TTL: 30})
|
||||||
if err1 != nil {
|
if err1 != nil {
|
||||||
t.Fatal(err1)
|
t.Fatal(err1)
|
||||||
}
|
}
|
||||||
lresp2, err2 := clus.RandClient().Lease.LeaseCreate(ctx, &pb.LeaseCreateRequest{TTL: 30})
|
lresp2, err2 := toGRPC(clus.RandClient()).Lease.LeaseCreate(ctx, &pb.LeaseCreateRequest{TTL: 30})
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
t.Fatal(err2)
|
t.Fatal(err2)
|
||||||
}
|
}
|
||||||
|
|
||||||
// attach key on lease1 then switch it to lease2
|
// attach key on lease1 then switch it to lease2
|
||||||
put1 := &pb.PutRequest{Key: []byte(key), Lease: lresp1.ID}
|
put1 := &pb.PutRequest{Key: []byte(key), Lease: lresp1.ID}
|
||||||
_, err := clus.RandClient().KV.Put(ctx, put1)
|
_, err := toGRPC(clus.RandClient()).KV.Put(ctx, put1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
put2 := &pb.PutRequest{Key: []byte(key), Lease: lresp2.ID}
|
put2 := &pb.PutRequest{Key: []byte(key), Lease: lresp2.ID}
|
||||||
_, err = clus.RandClient().KV.Put(ctx, put2)
|
_, err = toGRPC(clus.RandClient()).KV.Put(ctx, put2)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// revoke lease1 should not remove key
|
// revoke lease1 should not remove key
|
||||||
_, err = clus.RandClient().Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp1.ID})
|
_, err = toGRPC(clus.RandClient()).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp1.ID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
rreq := &pb.RangeRequest{Key: []byte("foo")}
|
rreq := &pb.RangeRequest{Key: []byte("foo")}
|
||||||
rresp, err := clus.RandClient().KV.Range(context.TODO(), rreq)
|
rresp, err := toGRPC(clus.RandClient()).KV.Range(context.TODO(), rreq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -277,11 +277,11 @@ func TestV3LeaseSwitch(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// revoke lease2 should remove key
|
// revoke lease2 should remove key
|
||||||
_, err = clus.RandClient().Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp2.ID})
|
_, err = toGRPC(clus.RandClient()).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp2.ID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
rresp, err = clus.RandClient().KV.Range(context.TODO(), rreq)
|
rresp, err = toGRPC(clus.RandClient()).KV.Range(context.TODO(), rreq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -293,7 +293,7 @@ func TestV3LeaseSwitch(t *testing.T) {
|
|||||||
// acquireLeaseAndKey creates a new lease and creates an attached key.
|
// acquireLeaseAndKey creates a new lease and creates an attached key.
|
||||||
func acquireLeaseAndKey(clus *ClusterV3, key string) (int64, error) {
|
func acquireLeaseAndKey(clus *ClusterV3, key string) (int64, error) {
|
||||||
// create lease
|
// create lease
|
||||||
lresp, err := clus.RandClient().Lease.LeaseCreate(
|
lresp, err := toGRPC(clus.RandClient()).Lease.LeaseCreate(
|
||||||
context.TODO(),
|
context.TODO(),
|
||||||
&pb.LeaseCreateRequest{TTL: 1})
|
&pb.LeaseCreateRequest{TTL: 1})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -304,7 +304,7 @@ func acquireLeaseAndKey(clus *ClusterV3, key string) (int64, error) {
|
|||||||
}
|
}
|
||||||
// attach to key
|
// attach to key
|
||||||
put := &pb.PutRequest{Key: []byte(key), Lease: lresp.ID}
|
put := &pb.PutRequest{Key: []byte(key), Lease: lresp.ID}
|
||||||
if _, err := clus.RandClient().KV.Put(context.TODO(), put); err != nil {
|
if _, err := toGRPC(clus.RandClient()).KV.Put(context.TODO(), put); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
return lresp.ID, nil
|
return lresp.ID, nil
|
||||||
@ -327,7 +327,7 @@ func testLeaseRemoveLeasedKey(t *testing.T, act func(*ClusterV3, int64) error) {
|
|||||||
|
|
||||||
// confirm no key
|
// confirm no key
|
||||||
rreq := &pb.RangeRequest{Key: []byte("foo")}
|
rreq := &pb.RangeRequest{Key: []byte("foo")}
|
||||||
rresp, err := clus.RandClient().KV.Range(context.TODO(), rreq)
|
rresp, err := toGRPC(clus.RandClient()).KV.Range(context.TODO(), rreq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -337,7 +337,7 @@ func testLeaseRemoveLeasedKey(t *testing.T, act func(*ClusterV3, int64) error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func leaseExist(t *testing.T, clus *ClusterV3, leaseID int64) bool {
|
func leaseExist(t *testing.T, clus *ClusterV3, leaseID int64) bool {
|
||||||
l := clus.RandClient().Lease
|
l := toGRPC(clus.RandClient()).Lease
|
||||||
|
|
||||||
_, err := l.LeaseCreate(context.Background(), &pb.LeaseCreateRequest{ID: leaseID, TTL: 5})
|
_, err := l.LeaseCreate(context.Background(), &pb.LeaseCreateRequest{ID: leaseID, TTL: 5})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -19,7 +19,6 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
v3 "github.com/coreos/etcd/clientv3"
|
|
||||||
"github.com/coreos/etcd/contrib/recipes"
|
"github.com/coreos/etcd/contrib/recipes"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -31,7 +30,7 @@ func TestSTMConflict(t *testing.T) {
|
|||||||
etcdc := clus.RandClient()
|
etcdc := clus.RandClient()
|
||||||
keys := make([]*recipe.RemoteKV, 5)
|
keys := make([]*recipe.RemoteKV, 5)
|
||||||
for i := 0; i < len(keys); i++ {
|
for i := 0; i < len(keys); i++ {
|
||||||
rk, err := recipe.NewKV(v3.NewKV(etcdc), fmt.Sprintf("foo-%d", i), "100", 0)
|
rk, err := recipe.NewKV(etcdc, fmt.Sprintf("foo-%d", i), "100", 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("could not make key (%v)", err)
|
t.Fatalf("could not make key (%v)", err)
|
||||||
}
|
}
|
||||||
@ -76,7 +75,7 @@ func TestSTMConflict(t *testing.T) {
|
|||||||
// ensure sum matches initial sum
|
// ensure sum matches initial sum
|
||||||
sum := 0
|
sum := 0
|
||||||
for _, oldRK := range keys {
|
for _, oldRK := range keys {
|
||||||
rk, err := recipe.GetRemoteKV(v3.NewKV(etcdc), oldRK.Key())
|
rk, err := recipe.GetRemoteKV(etcdc, oldRK.Key())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("couldn't fetch key %s (%v)", oldRK.Key(), err)
|
t.Fatalf("couldn't fetch key %s (%v)", oldRK.Key(), err)
|
||||||
}
|
}
|
||||||
@ -103,7 +102,7 @@ func TestSTMPutNewKey(t *testing.T) {
|
|||||||
t.Fatalf("error on stm txn (%v)", err)
|
t.Fatalf("error on stm txn (%v)", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
rk, err := recipe.GetRemoteKV(v3.NewKV(etcdc), "foo")
|
rk, err := recipe.GetRemoteKV(etcdc, "foo")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error fetching key (%v)", err)
|
t.Fatalf("error fetching key (%v)", err)
|
||||||
}
|
}
|
||||||
@ -129,7 +128,7 @@ func TestSTMAbort(t *testing.T) {
|
|||||||
t.Fatalf("error on stm txn (%v)", err)
|
t.Fatalf("error on stm txn (%v)", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
rk, err := recipe.GetRemoteKV(v3.NewKV(etcdc), "foo")
|
rk, err := recipe.GetRemoteKV(etcdc, "foo")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error fetching key (%v)", err)
|
t.Fatalf("error fetching key (%v)", err)
|
||||||
}
|
}
|
||||||
|
@ -180,7 +180,7 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
|
|||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||||
|
|
||||||
wAPI := clus.RandClient().Watch
|
wAPI := toGRPC(clus.RandClient()).Watch
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
wStream, err := wAPI.Watch(ctx)
|
wStream, err := wAPI.Watch(ctx)
|
||||||
@ -212,7 +212,7 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
|
|||||||
// asynchronously create keys
|
// asynchronously create keys
|
||||||
go func() {
|
go func() {
|
||||||
for _, k := range tt.putKeys {
|
for _, k := range tt.putKeys {
|
||||||
kvc := clus.RandClient().KV
|
kvc := toGRPC(clus.RandClient()).KV
|
||||||
req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
|
req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
|
||||||
if _, err := kvc.Put(context.TODO(), req); err != nil {
|
if _, err := kvc.Put(context.TODO(), req); err != nil {
|
||||||
t.Fatalf("#%d: couldn't put key (%v)", i, err)
|
t.Fatalf("#%d: couldn't put key (%v)", i, err)
|
||||||
@ -273,7 +273,7 @@ func testV3WatchCancel(t *testing.T, startRev int64) {
|
|||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
wStream, errW := clus.RandClient().Watch.Watch(ctx)
|
wStream, errW := toGRPC(clus.RandClient()).Watch.Watch(ctx)
|
||||||
if errW != nil {
|
if errW != nil {
|
||||||
t.Fatalf("wAPI.Watch error: %v", errW)
|
t.Fatalf("wAPI.Watch error: %v", errW)
|
||||||
}
|
}
|
||||||
@ -308,7 +308,7 @@ func testV3WatchCancel(t *testing.T, startRev int64) {
|
|||||||
t.Errorf("cresp.Canceled got = %v, want = true", cresp.Canceled)
|
t.Errorf("cresp.Canceled got = %v, want = true", cresp.Canceled)
|
||||||
}
|
}
|
||||||
|
|
||||||
kvc := clus.RandClient().KV
|
kvc := toGRPC(clus.RandClient()).KV
|
||||||
if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil {
|
if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil {
|
||||||
t.Errorf("couldn't put key (%v)", err)
|
t.Errorf("couldn't put key (%v)", err)
|
||||||
}
|
}
|
||||||
@ -331,7 +331,7 @@ func TestV3WatchCurrentPutOverlap(t *testing.T) {
|
|||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
wStream, wErr := clus.RandClient().Watch.Watch(ctx)
|
wStream, wErr := toGRPC(clus.RandClient()).Watch.Watch(ctx)
|
||||||
if wErr != nil {
|
if wErr != nil {
|
||||||
t.Fatalf("wAPI.Watch error: %v", wErr)
|
t.Fatalf("wAPI.Watch error: %v", wErr)
|
||||||
}
|
}
|
||||||
@ -341,7 +341,7 @@ func TestV3WatchCurrentPutOverlap(t *testing.T) {
|
|||||||
// first revision already allocated as empty revision
|
// first revision already allocated as empty revision
|
||||||
for i := 1; i < nrRevisions; i++ {
|
for i := 1; i < nrRevisions; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
kvc := clus.RandClient().KV
|
kvc := toGRPC(clus.RandClient()).KV
|
||||||
req := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
|
req := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
|
||||||
if _, err := kvc.Put(context.TODO(), req); err != nil {
|
if _, err := kvc.Put(context.TODO(), req); err != nil {
|
||||||
t.Fatalf("couldn't put key (%v)", err)
|
t.Fatalf("couldn't put key (%v)", err)
|
||||||
@ -418,11 +418,11 @@ func TestV3WatchMultipleWatchersUnsynced(t *testing.T) {
|
|||||||
// one watcher to test if it receives expected events.
|
// one watcher to test if it receives expected events.
|
||||||
func testV3WatchMultipleWatchers(t *testing.T, startRev int64) {
|
func testV3WatchMultipleWatchers(t *testing.T, startRev int64) {
|
||||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||||
kvc := clus.RandClient().KV
|
kvc := toGRPC(clus.RandClient()).KV
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
wStream, errW := clus.RandClient().Watch.Watch(ctx)
|
wStream, errW := toGRPC(clus.RandClient()).Watch.Watch(ctx)
|
||||||
if errW != nil {
|
if errW != nil {
|
||||||
t.Fatalf("wAPI.Watch error: %v", errW)
|
t.Fatalf("wAPI.Watch error: %v", errW)
|
||||||
}
|
}
|
||||||
@ -523,7 +523,7 @@ func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) {
|
|||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
wStream, wErr := clus.RandClient().Watch.Watch(ctx)
|
wStream, wErr := toGRPC(clus.RandClient()).Watch.Watch(ctx)
|
||||||
if wErr != nil {
|
if wErr != nil {
|
||||||
t.Fatalf("wAPI.Watch error: %v", wErr)
|
t.Fatalf("wAPI.Watch error: %v", wErr)
|
||||||
}
|
}
|
||||||
@ -535,7 +535,7 @@ func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) {
|
|||||||
t.Fatalf("wStream.Send error: %v", err)
|
t.Fatalf("wStream.Send error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
kvc := clus.RandClient().KV
|
kvc := toGRPC(clus.RandClient()).KV
|
||||||
txn := pb.TxnRequest{}
|
txn := pb.TxnRequest{}
|
||||||
for i := 0; i < 3; i++ {
|
for i := 0; i < 3; i++ {
|
||||||
ru := &pb.RequestUnion{}
|
ru := &pb.RequestUnion{}
|
||||||
@ -605,7 +605,7 @@ func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) {
|
|||||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||||
defer clus.Terminate(t)
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
kvc := clus.RandClient().KV
|
kvc := toGRPC(clus.RandClient()).KV
|
||||||
|
|
||||||
if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo0"), Value: []byte("bar")}); err != nil {
|
if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo0"), Value: []byte("bar")}); err != nil {
|
||||||
t.Fatalf("couldn't put key (%v)", err)
|
t.Fatalf("couldn't put key (%v)", err)
|
||||||
@ -616,7 +616,7 @@ func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) {
|
|||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
wStream, wErr := clus.RandClient().Watch.Watch(ctx)
|
wStream, wErr := toGRPC(clus.RandClient()).Watch.Watch(ctx)
|
||||||
if wErr != nil {
|
if wErr != nil {
|
||||||
t.Fatalf("wAPI.Watch error: %v", wErr)
|
t.Fatalf("wAPI.Watch error: %v", wErr)
|
||||||
}
|
}
|
||||||
@ -692,8 +692,8 @@ func TestV3WatchMultipleStreamsUnsynced(t *testing.T) {
|
|||||||
// testV3WatchMultipleStreams tests multiple watchers on the same key on multiple streams.
|
// testV3WatchMultipleStreams tests multiple watchers on the same key on multiple streams.
|
||||||
func testV3WatchMultipleStreams(t *testing.T, startRev int64) {
|
func testV3WatchMultipleStreams(t *testing.T, startRev int64) {
|
||||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||||
wAPI := clus.RandClient().Watch
|
wAPI := toGRPC(clus.RandClient()).Watch
|
||||||
kvc := clus.RandClient().KV
|
kvc := toGRPC(clus.RandClient()).KV
|
||||||
|
|
||||||
streams := make([]pb.Watch_WatchClient, 5)
|
streams := make([]pb.Watch_WatchClient, 5)
|
||||||
for i := range streams {
|
for i := range streams {
|
||||||
@ -792,7 +792,7 @@ func TestV3WatchInvalidFutureRevision(t *testing.T) {
|
|||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
wStream, wErr := clus.RandClient().Watch.Watch(ctx)
|
wStream, wErr := toGRPC(clus.RandClient()).Watch.Watch(ctx)
|
||||||
if wErr != nil {
|
if wErr != nil {
|
||||||
t.Fatalf("wAPI.Watch error: %v", wErr)
|
t.Fatalf("wAPI.Watch error: %v", wErr)
|
||||||
}
|
}
|
||||||
|
@ -24,7 +24,7 @@ import (
|
|||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/cheggaaa/pb"
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/cheggaaa/pb"
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
"github.com/coreos/etcd/etcdserver/etcdserverpb"
|
v3 "github.com/coreos/etcd/clientv3"
|
||||||
)
|
)
|
||||||
|
|
||||||
// putCmd represents the put command
|
// putCmd represents the put command
|
||||||
@ -61,10 +61,10 @@ func putFunc(cmd *cobra.Command, args []string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
results = make(chan result)
|
results = make(chan result)
|
||||||
requests := make(chan etcdserverpb.PutRequest, totalClients)
|
requests := make(chan v3.Op, totalClients)
|
||||||
bar = pb.New(putTotal)
|
bar = pb.New(putTotal)
|
||||||
|
|
||||||
k, v := make([]byte, keySize), mustRandBytes(valSize)
|
k, v := make([]byte, keySize), string(mustRandBytes(valSize))
|
||||||
|
|
||||||
clients := mustCreateClients(totalClients, totalConns)
|
clients := mustCreateClients(totalClients, totalConns)
|
||||||
|
|
||||||
@ -73,7 +73,7 @@ func putFunc(cmd *cobra.Command, args []string) {
|
|||||||
|
|
||||||
for i := range clients {
|
for i := range clients {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go doPut(context.Background(), clients[i].KV, requests)
|
go doPut(context.Background(), clients[i], requests)
|
||||||
}
|
}
|
||||||
|
|
||||||
pdoneC := printReport(results)
|
pdoneC := printReport(results)
|
||||||
@ -85,7 +85,7 @@ func putFunc(cmd *cobra.Command, args []string) {
|
|||||||
} else {
|
} else {
|
||||||
binary.PutVarint(k, int64(rand.Intn(keySpaceSize)))
|
binary.PutVarint(k, int64(rand.Intn(keySpaceSize)))
|
||||||
}
|
}
|
||||||
requests <- etcdserverpb.PutRequest{Key: k, Value: v}
|
requests <- v3.OpPut(string(k), v)
|
||||||
}
|
}
|
||||||
close(requests)
|
close(requests)
|
||||||
}()
|
}()
|
||||||
@ -98,12 +98,12 @@ func putFunc(cmd *cobra.Command, args []string) {
|
|||||||
<-pdoneC
|
<-pdoneC
|
||||||
}
|
}
|
||||||
|
|
||||||
func doPut(ctx context.Context, client etcdserverpb.KVClient, requests <-chan etcdserverpb.PutRequest) {
|
func doPut(ctx context.Context, client v3.KV, requests <-chan v3.Op) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
for r := range requests {
|
for op := range requests {
|
||||||
st := time.Now()
|
st := time.Now()
|
||||||
_, err := client.Put(ctx, &r)
|
_, err := client.Do(ctx, op)
|
||||||
|
|
||||||
var errStr string
|
var errStr string
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -22,7 +22,7 @@ import (
|
|||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/cheggaaa/pb"
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/cheggaaa/pb"
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
"github.com/coreos/etcd/etcdserver/etcdserverpb"
|
v3 "github.com/coreos/etcd/clientv3"
|
||||||
)
|
)
|
||||||
|
|
||||||
// rangeCmd represents the range command
|
// rangeCmd represents the range command
|
||||||
@ -50,10 +50,10 @@ func rangeFunc(cmd *cobra.Command, args []string) {
|
|||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
k := []byte(args[0])
|
k := args[0]
|
||||||
var end []byte
|
end := ""
|
||||||
if len(args) == 2 {
|
if len(args) == 2 {
|
||||||
end = []byte(args[1])
|
end = args[1]
|
||||||
}
|
}
|
||||||
|
|
||||||
if rangeConsistency == "l" {
|
if rangeConsistency == "l" {
|
||||||
@ -66,7 +66,7 @@ func rangeFunc(cmd *cobra.Command, args []string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
results = make(chan result)
|
results = make(chan result)
|
||||||
requests := make(chan etcdserverpb.RangeRequest, totalClients)
|
requests := make(chan v3.Op, totalClients)
|
||||||
bar = pb.New(rangeTotal)
|
bar = pb.New(rangeTotal)
|
||||||
|
|
||||||
clients := mustCreateClients(totalClients, totalConns)
|
clients := mustCreateClients(totalClients, totalConns)
|
||||||
@ -83,11 +83,12 @@ func rangeFunc(cmd *cobra.Command, args []string) {
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for i := 0; i < rangeTotal; i++ {
|
for i := 0; i < rangeTotal; i++ {
|
||||||
r := etcdserverpb.RangeRequest{Key: k, RangeEnd: end}
|
opts := []v3.OpOption{v3.WithRange(end)}
|
||||||
if rangeConsistency == "s" {
|
if rangeConsistency == "s" {
|
||||||
r.Serializable = true
|
opts = append(opts, v3.WithSerializable())
|
||||||
}
|
}
|
||||||
requests <- r
|
op := v3.OpGet(k, opts...)
|
||||||
|
requests <- op
|
||||||
}
|
}
|
||||||
close(requests)
|
close(requests)
|
||||||
}()
|
}()
|
||||||
@ -100,12 +101,12 @@ func rangeFunc(cmd *cobra.Command, args []string) {
|
|||||||
<-pdoneC
|
<-pdoneC
|
||||||
}
|
}
|
||||||
|
|
||||||
func doRange(client etcdserverpb.KVClient, requests <-chan etcdserverpb.RangeRequest) {
|
func doRange(client v3.KV, requests <-chan v3.Op) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
for req := range requests {
|
for op := range requests {
|
||||||
st := time.Now()
|
st := time.Now()
|
||||||
_, err := client.Range(context.Background(), &req)
|
_, err := client.Do(context.Background(), op)
|
||||||
|
|
||||||
var errStr string
|
var errStr string
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -20,7 +20,7 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/etcd/etcdserver/etcdserverpb"
|
v3 "github.com/coreos/etcd/clientv3"
|
||||||
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/cheggaaa/pb"
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/cheggaaa/pb"
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
|
||||||
@ -73,23 +73,18 @@ func init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func watchFunc(cmd *cobra.Command, args []string) {
|
func watchFunc(cmd *cobra.Command, args []string) {
|
||||||
watched := make([][]byte, watchedKeyTotal)
|
watched := make([]string, watchedKeyTotal)
|
||||||
for i := range watched {
|
for i := range watched {
|
||||||
watched[i] = mustRandBytes(32)
|
watched[i] = string(mustRandBytes(32))
|
||||||
}
|
}
|
||||||
|
|
||||||
requests := make(chan etcdserverpb.WatchRequest, totalClients)
|
requests := make(chan string, totalClients)
|
||||||
|
|
||||||
clients := mustCreateClients(totalClients, totalConns)
|
clients := mustCreateClients(totalClients, totalConns)
|
||||||
|
|
||||||
streams := make([]etcdserverpb.Watch_WatchClient, watchTotalStreams)
|
streams := make([]v3.Watcher, watchTotalStreams)
|
||||||
var err error
|
|
||||||
for i := range streams {
|
for i := range streams {
|
||||||
streams[i], err = clients[i%len(clients)].Watch.Watch(context.TODO())
|
streams[i] = v3.NewWatcher(clients[i%len(clients)])
|
||||||
if err != nil {
|
|
||||||
fmt.Fprintln(os.Stderr, "Failed to create watch stream:", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
putStartNotifier = make(chan struct{})
|
putStartNotifier = make(chan struct{})
|
||||||
@ -111,10 +106,7 @@ func watchFunc(cmd *cobra.Command, args []string) {
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for i := 0; i < watchTotal; i++ {
|
for i := 0; i < watchTotal; i++ {
|
||||||
requests <- etcdserverpb.WatchRequest{
|
requests <- watched[i%len(watched)]
|
||||||
RequestUnion: &etcdserverpb.WatchRequest_CreateRequest{
|
|
||||||
CreateRequest: &etcdserverpb.WatchCreateRequest{
|
|
||||||
Key: watched[i%(len(watched))]}}}
|
|
||||||
}
|
}
|
||||||
close(requests)
|
close(requests)
|
||||||
}()
|
}()
|
||||||
@ -139,7 +131,7 @@ func watchFunc(cmd *cobra.Command, args []string) {
|
|||||||
recvCompletedNotifier = make(chan struct{})
|
recvCompletedNotifier = make(chan struct{})
|
||||||
close(putStartNotifier)
|
close(putStartNotifier)
|
||||||
|
|
||||||
putreqc := make(chan etcdserverpb.PutRequest)
|
putreqc := make(chan v3.Op)
|
||||||
|
|
||||||
for i := 0; i < watchPutTotal; i++ {
|
for i := 0; i < watchPutTotal; i++ {
|
||||||
go doPutForWatch(context.TODO(), clients[i%len(clients)].KV, putreqc)
|
go doPutForWatch(context.TODO(), clients[i%len(clients)].KV, putreqc)
|
||||||
@ -149,10 +141,7 @@ func watchFunc(cmd *cobra.Command, args []string) {
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for i := 0; i < eventsTotal; i++ {
|
for i := 0; i < eventsTotal; i++ {
|
||||||
putreqc <- etcdserverpb.PutRequest{
|
putreqc <- v3.OpPut(watched[i%(len(watched))], "data")
|
||||||
Key: watched[i%(len(watched))],
|
|
||||||
Value: []byte("data"),
|
|
||||||
}
|
|
||||||
// TODO: use a real rate-limiter instead of sleep.
|
// TODO: use a real rate-limiter instead of sleep.
|
||||||
time.Sleep(time.Second / time.Duration(watchPutRate))
|
time.Sleep(time.Second / time.Duration(watchPutRate))
|
||||||
}
|
}
|
||||||
@ -166,16 +155,17 @@ func watchFunc(cmd *cobra.Command, args []string) {
|
|||||||
<-pdoneC
|
<-pdoneC
|
||||||
}
|
}
|
||||||
|
|
||||||
func doWatch(stream etcdserverpb.Watch_WatchClient, requests <-chan etcdserverpb.WatchRequest) {
|
func doWatch(stream v3.Watcher, requests <-chan string) {
|
||||||
for r := range requests {
|
for r := range requests {
|
||||||
st := time.Now()
|
st := time.Now()
|
||||||
err := stream.Send(&r)
|
wch := stream.Watch(context.TODO(), r)
|
||||||
var errStr string
|
var errStr string
|
||||||
if err != nil {
|
if wch == nil {
|
||||||
errStr = err.Error()
|
errStr = "could not open watch channel"
|
||||||
}
|
}
|
||||||
results <- result{errStr: errStr, duration: time.Since(st)}
|
results <- result{errStr: errStr, duration: time.Since(st)}
|
||||||
bar.Increment()
|
bar.Increment()
|
||||||
|
go recvWatchChan(wch)
|
||||||
}
|
}
|
||||||
atomic.AddInt32(&nrWatchCompleted, 1)
|
atomic.AddInt32(&nrWatchCompleted, 1)
|
||||||
if atomic.LoadInt32(&nrWatchCompleted) == int32(watchTotalStreams) {
|
if atomic.LoadInt32(&nrWatchCompleted) == int32(watchTotalStreams) {
|
||||||
@ -183,15 +173,12 @@ func doWatch(stream etcdserverpb.Watch_WatchClient, requests <-chan etcdserverpb
|
|||||||
}
|
}
|
||||||
|
|
||||||
<-putStartNotifier
|
<-putStartNotifier
|
||||||
|
}
|
||||||
|
|
||||||
for {
|
func recvWatchChan(wch v3.WatchChan) {
|
||||||
|
for range wch {
|
||||||
st := time.Now()
|
st := time.Now()
|
||||||
_, err := stream.Recv()
|
results <- result{duration: time.Since(st)}
|
||||||
var errStr string
|
|
||||||
if err != nil {
|
|
||||||
errStr = err.Error()
|
|
||||||
}
|
|
||||||
results <- result{errStr: errStr, duration: time.Since(st)}
|
|
||||||
bar.Increment()
|
bar.Increment()
|
||||||
|
|
||||||
atomic.AddInt32(&nrRecvCompleted, 1)
|
atomic.AddInt32(&nrRecvCompleted, 1)
|
||||||
@ -201,11 +188,11 @@ func doWatch(stream etcdserverpb.Watch_WatchClient, requests <-chan etcdserverpb
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func doPutForWatch(ctx context.Context, client etcdserverpb.KVClient, requests <-chan etcdserverpb.PutRequest) {
|
func doPutForWatch(ctx context.Context, client v3.KV, requests <-chan v3.Op) {
|
||||||
for r := range requests {
|
for op := range requests {
|
||||||
_, err := client.Put(ctx, &r)
|
_, err := client.Do(ctx, op)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Fprintln(os.Stderr, "failed to Put for watch benchmark: %s", err)
|
fmt.Fprintf(os.Stderr, "failed to Put for watch benchmark: %v\n", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user