mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
clientv3: goword spelling check
This commit is contained in:
parent
4587d56731
commit
2d0eec0b35
24
.words
Normal file
24
.words
Normal file
@ -0,0 +1,24 @@
|
||||
RPC
|
||||
RPCs
|
||||
cancelable
|
||||
cancelation
|
||||
defragment
|
||||
defragmenting
|
||||
etcd
|
||||
gRPC
|
||||
goroutine
|
||||
goroutines
|
||||
iff
|
||||
inflight
|
||||
keepalive
|
||||
keepalives
|
||||
keyspace
|
||||
linearization
|
||||
localhost
|
||||
mutex
|
||||
prefetching
|
||||
protobuf
|
||||
serializable
|
||||
teardown
|
||||
uncontended
|
||||
unprefixed
|
@ -41,10 +41,10 @@ type simpleBalancer struct {
|
||||
readyc chan struct{}
|
||||
readyOnce sync.Once
|
||||
|
||||
// mu protects upEps, pinAddr, and connectingAddr
|
||||
// mu protects all fields below.
|
||||
mu sync.RWMutex
|
||||
|
||||
// upc closes when upEps transitions from empty to non-zero or the balancer closes.
|
||||
// upc closes when pinAddr transitions from empty to non-empty or the balancer closes.
|
||||
upc chan struct{}
|
||||
|
||||
// downc closes when grpc calls down() on pinAddr
|
||||
@ -65,7 +65,7 @@ type simpleBalancer struct {
|
||||
host2ep map[string]string
|
||||
|
||||
// pinAddr is the currently pinned address; set to the empty string on
|
||||
// intialization and shutdown.
|
||||
// initialization and shutdown.
|
||||
pinAddr string
|
||||
|
||||
closed bool
|
||||
@ -234,8 +234,8 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
|
||||
defer b.mu.Unlock()
|
||||
|
||||
// gRPC might call Up after it called Close. We add this check
|
||||
// to "fix" it up at application layer. Or our simplerBalancer
|
||||
// might panic since b.upc is closed.
|
||||
// to "fix" it up at application layer. Otherwise, will panic
|
||||
// if b.upc is already closed.
|
||||
if b.closed {
|
||||
return func(err error) {}
|
||||
}
|
||||
@ -327,8 +327,8 @@ func (b *simpleBalancer) Close() error {
|
||||
|
||||
// In the case of following scenario:
|
||||
// 1. upc is not closed; no pinned address
|
||||
// 2. client issues an rpc, calling invoke(), which calls Get(), enters for loop, blocks
|
||||
// 3. clientconn.Close() calls balancer.Close(); closed = true
|
||||
// 2. client issues an RPC, calling invoke(), which calls Get(), enters for loop, blocks
|
||||
// 3. client.conn.Close() calls balancer.Close(); closed = true
|
||||
// 4. for loop in Get() never exits since ctx is the context passed in by the client and may not be canceled
|
||||
// we must close upc so Get() exits from blocking on upc
|
||||
select {
|
||||
|
@ -59,9 +59,9 @@ type Client struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
// Username is a username for authentication
|
||||
// Username is a user name for authentication.
|
||||
Username string
|
||||
// Password is a password for authentication
|
||||
// Password is a password for authentication.
|
||||
Password string
|
||||
// tokenCred is an instance of WithPerRPCCredentials()'s argument
|
||||
tokenCred *authTokenCredential
|
||||
@ -216,11 +216,8 @@ func (c *Client) dialSetupOpts(endpoint string, dopts ...grpc.DialOption) (opts
|
||||
}
|
||||
if c.cfg.DialKeepAliveTime > 0 {
|
||||
params := keepalive.ClientParameters{
|
||||
Time: c.cfg.DialKeepAliveTime,
|
||||
}
|
||||
// Only relevant when KeepAliveTime is non-zero
|
||||
if c.cfg.DialKeepAliveTimeout > 0 {
|
||||
params.Timeout = c.cfg.DialKeepAliveTimeout
|
||||
Time: c.cfg.DialKeepAliveTime,
|
||||
Timeout: c.cfg.DialKeepAliveTimeout,
|
||||
}
|
||||
opts = append(opts, grpc.WithKeepaliveParams(params))
|
||||
}
|
||||
@ -377,7 +374,7 @@ func newClient(cfg *Config) (*Client, error) {
|
||||
|
||||
client.balancer = newSimpleBalancer(cfg.Endpoints)
|
||||
// use Endpoints[0] so that for https:// without any tls config given, then
|
||||
// grpc will assume the ServerName is in the endpoint.
|
||||
// grpc will assume the certificate server name is the endpoint host.
|
||||
conn, err := client.dial(cfg.Endpoints[0], grpc.WithBalancer(client.balancer))
|
||||
if err != nil {
|
||||
client.cancel()
|
||||
|
@ -44,7 +44,7 @@ func TestDialCancel(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// connect to ipv4 blackhole so dial blocks
|
||||
// connect to ipv4 black hole so dial blocks
|
||||
c.SetEndpoints("http://254.0.0.1:12345")
|
||||
|
||||
// issue Get to force redial attempts
|
||||
@ -96,7 +96,7 @@ func TestDialTimeout(t *testing.T) {
|
||||
for i, cfg := range testCfgs {
|
||||
donec := make(chan error)
|
||||
go func() {
|
||||
// without timeout, dial continues forever on ipv4 blackhole
|
||||
// without timeout, dial continues forever on ipv4 black hole
|
||||
c, err := New(cfg)
|
||||
if c != nil || err == nil {
|
||||
t.Errorf("#%d: new client should fail", i)
|
||||
|
@ -33,7 +33,7 @@ func ExampleKeyExists_put() {
|
||||
kvc := clientv3.NewKV(cli)
|
||||
|
||||
// perform a put only if key is missing
|
||||
// It is useful to do the check (transactionally) to avoid overwriting
|
||||
// It is useful to do the check atomically to avoid overwriting
|
||||
// the existing key which would generate potentially unwanted events,
|
||||
// unless of course you wanted to do an overwrite no matter what.
|
||||
_, err = kvc.Txn(context.Background()).
|
||||
|
@ -44,10 +44,8 @@ func (op CompactOp) toRequest() *pb.CompactionRequest {
|
||||
return &pb.CompactionRequest{Revision: op.revision, Physical: op.physical}
|
||||
}
|
||||
|
||||
// WithCompactPhysical makes compact RPC call wait until
|
||||
// the compaction is physically applied to the local database
|
||||
// such that compacted entries are totally removed from the
|
||||
// backend database.
|
||||
// WithCompactPhysical makes Compact wait until all compacted entries are
|
||||
// removed from the etcd server's storage.
|
||||
func WithCompactPhysical() CompactOption {
|
||||
return func(op *CompactOp) { op.physical = true }
|
||||
}
|
||||
|
@ -185,12 +185,12 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
// only accept PUTs; a DELETE will make observe() spin
|
||||
// only accept puts; a delete will make observe() spin
|
||||
for _, ev := range wr.Events {
|
||||
if ev.Type == mvccpb.PUT {
|
||||
hdr, kv = &wr.Header, ev.Kv
|
||||
// may have multiple revs; hdr.rev = the last rev
|
||||
// set to kv's rev in case batch has multiple PUTs
|
||||
// set to kv's rev in case batch has multiple Puts
|
||||
hdr.Revision = kv.ModRevision
|
||||
break
|
||||
}
|
||||
|
@ -60,7 +60,7 @@ func ExampleSTM_apply() {
|
||||
xfer := fromInt / 2
|
||||
fromInt, toInt = fromInt-xfer, toInt+xfer
|
||||
|
||||
// writeback
|
||||
// write back
|
||||
stm.Put(fromK, fmt.Sprintf("%d", fromInt))
|
||||
stm.Put(toK, fmt.Sprintf("%d", toInt))
|
||||
return nil
|
||||
|
@ -44,7 +44,7 @@ type Config struct {
|
||||
// TLS holds the client secure credentials, if any.
|
||||
TLS *tls.Config
|
||||
|
||||
// Username is a username for authentication.
|
||||
// Username is a user name for authentication.
|
||||
Username string `json:"username"`
|
||||
|
||||
// Password is a password for authentication.
|
||||
|
@ -28,7 +28,7 @@
|
||||
// Make sure to close the client after using it. If the client is not closed, the
|
||||
// connection will have leaky goroutines.
|
||||
//
|
||||
// To specify client request timeout, pass context.WithTimeout to APIs:
|
||||
// To specify a client request timeout, wrap the context with context.WithTimeout:
|
||||
//
|
||||
// ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
// resp, err := kvc.Put(ctx, "sample_key", "sample_value")
|
||||
|
@ -236,8 +236,11 @@ func ExampleKV_txn() {
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
_, err = kvc.Txn(ctx).
|
||||
If(clientv3.Compare(clientv3.Value("key"), ">", "abc")). // txn value comparisons are lexical
|
||||
Then(clientv3.OpPut("key", "XYZ")). // this runs, since 'xyz' > 'abc'
|
||||
// txn value comparisons are lexical
|
||||
If(clientv3.Compare(clientv3.Value("key"), ">", "abc")).
|
||||
// the "Then" runs, since "xyz" > "abc"
|
||||
Then(clientv3.OpPut("key", "XYZ")).
|
||||
// the "Else" does not run
|
||||
Else(clientv3.OpPut("key", "ABC")).
|
||||
Commit()
|
||||
cancel()
|
||||
|
@ -34,20 +34,15 @@ func ExampleMaintenance_status() {
|
||||
}
|
||||
defer cli.Close()
|
||||
|
||||
// resp, err := cli.Status(context.Background(), ep)
|
||||
//
|
||||
// or
|
||||
//
|
||||
mapi := clientv3.NewMaintenance(cli)
|
||||
resp, err := mapi.Status(context.Background(), ep)
|
||||
resp, err := cli.Status(context.Background(), ep)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
fmt.Printf("endpoint: %s / IsLeader: %v\n", ep, resp.Header.MemberId == resp.Leader)
|
||||
fmt.Printf("endpoint: %s / Leader: %v\n", ep, resp.Header.MemberId == resp.Leader)
|
||||
}
|
||||
// endpoint: localhost:2379 / IsLeader: false
|
||||
// endpoint: localhost:22379 / IsLeader: false
|
||||
// endpoint: localhost:32379 / IsLeader: true
|
||||
// endpoint: localhost:2379 / Leader: false
|
||||
// endpoint: localhost:22379 / Leader: false
|
||||
// endpoint: localhost:32379 / Leader: true
|
||||
}
|
||||
|
||||
func ExampleMaintenance_defragment() {
|
||||
|
@ -43,7 +43,7 @@ func ExampleClient_metrics() {
|
||||
}
|
||||
defer cli.Close()
|
||||
|
||||
// get a key so it shows up in the metrics as a range rpc
|
||||
// get a key so it shows up in the metrics as a range RPC
|
||||
cli.Get(context.TODO(), "test_key")
|
||||
|
||||
// listen for all prometheus metrics
|
||||
@ -80,5 +80,6 @@ func ExampleClient_metrics() {
|
||||
break
|
||||
}
|
||||
}
|
||||
// Output: grpc_client_started_total{grpc_method="Range",grpc_service="etcdserverpb.KV",grpc_type="unary"} 1
|
||||
// Output:
|
||||
// grpc_client_started_total{grpc_method="Range",grpc_service="etcdserverpb.KV",grpc_type="unary"} 1
|
||||
}
|
||||
|
@ -55,7 +55,7 @@ func TestDialTLSExpired(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// expect remote errors 'tls: bad certificate'
|
||||
// expect remote errors "tls: bad certificate"
|
||||
_, err = clientv3.New(clientv3.Config{
|
||||
Endpoints: []string{clus.Members[0].GRPCAddr()},
|
||||
DialTimeout: 3 * time.Second,
|
||||
@ -72,7 +72,7 @@ func TestDialTLSNoConfig(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, ClientTLS: &testTLSInfo})
|
||||
defer clus.Terminate(t)
|
||||
// expect 'signed by unknown authority'
|
||||
// expect "signed by unknown authority"
|
||||
_, err := clientv3.New(clientv3.Config{
|
||||
Endpoints: []string{clus.Members[0].GRPCAddr()},
|
||||
DialTimeout: time.Second,
|
||||
@ -82,7 +82,8 @@ func TestDialTLSNoConfig(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestDialSetEndpoints ensures SetEndpoints can replace unavailable endpoints with available ones.
|
||||
// TestDialSetEndpointsBeforeFail ensures SetEndpoints can replace unavailable
|
||||
// endpoints with available ones.
|
||||
func TestDialSetEndpointsBeforeFail(t *testing.T) {
|
||||
testDialSetEndpoints(t, true)
|
||||
}
|
||||
@ -190,7 +191,7 @@ func TestDialForeignEndpoint(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestSetEndpointAndPut checks that a Put following a SetEndpoint
|
||||
// TestSetEndpointAndPut checks that a Put following a SetEndpoints
|
||||
// to a working endpoint will always succeed.
|
||||
func TestSetEndpointAndPut(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
@ -824,8 +824,8 @@ func TestKVPutStoppedServerAndClose(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestKVGetOneEndpointDown ensures a client can connect and get if one endpoint is down
|
||||
func TestKVPutOneEndpointDown(t *testing.T) {
|
||||
// TestKVGetOneEndpointDown ensures a client can connect and get if one endpoint is down.
|
||||
func TestKVGetOneEndpointDown(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
|
||||
defer clus.Terminate(t)
|
||||
|
@ -233,7 +233,7 @@ type leaseCh struct {
|
||||
ch <-chan *clientv3.LeaseKeepAliveResponse
|
||||
}
|
||||
|
||||
// TestLeaseKeepAliveNotFound ensures a revoked lease won't stop other keep alives
|
||||
// TestLeaseKeepAliveNotFound ensures a revoked lease won't halt other leases.
|
||||
func TestLeaseKeepAliveNotFound(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
@ -288,9 +288,7 @@ func TestLeaseGrantErrConnClosed(t *testing.T) {
|
||||
_, err := cli.Grant(context.TODO(), 5)
|
||||
if err != nil && err != grpc.ErrClientConnClosing && err != context.Canceled {
|
||||
// grpc.ErrClientConnClosing if grpc-go balancer calls 'Get' after client.Close.
|
||||
// context.Canceled if grpc-go balancer calls 'Get' with inflight client.Close,
|
||||
// soon transportMonitor selects on ClientTransport.Error() and resetTransport(false)
|
||||
// that cancels the context and closes the transport.
|
||||
// context.Canceled if grpc-go balancer calls 'Get' with an inflight client.Close.
|
||||
t.Fatalf("expected %v or %v, got %v", grpc.ErrClientConnClosing, context.Canceled, err)
|
||||
}
|
||||
}()
|
||||
@ -364,7 +362,7 @@ func TestLeaseRevokeNewAfterClose(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestLeaseKeepAliveCloseAfterDisconnectExpire ensures the keep alive channel is closed
|
||||
// TestLeaseKeepAliveCloseAfterDisconnectRevoke ensures the keep alive channel is closed
|
||||
// following a disconnection, lease revoke, then reconnect.
|
||||
func TestLeaseKeepAliveCloseAfterDisconnectRevoke(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
@ -399,7 +397,7 @@ func TestLeaseKeepAliveCloseAfterDisconnectRevoke(t *testing.T) {
|
||||
|
||||
clus.Members[0].Restart(t)
|
||||
|
||||
// some keep-alives may still be buffered; drain until close
|
||||
// some responses may still be buffered; drain until close
|
||||
timer := time.After(time.Duration(kresp.TTL) * time.Second)
|
||||
for kresp != nil {
|
||||
select {
|
||||
@ -555,8 +553,7 @@ func TestLeaseTimeToLiveLeaseNotFound(t *testing.T) {
|
||||
}
|
||||
|
||||
lresp, err := cli.TimeToLive(context.Background(), resp.ID)
|
||||
// TimeToLive() doesn't return LeaseNotFound error
|
||||
// but return a response with TTL to be -1
|
||||
// TimeToLive() should return a response with TTL=-1.
|
||||
if err != nil {
|
||||
t.Fatalf("expected err to be nil")
|
||||
}
|
||||
@ -677,8 +674,8 @@ func TestLeaseKeepAliveLoopExit(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestV3LeaseFailureOverlap issues Grant and Keepalive requests to a cluster
|
||||
// before, during, and after quorum loss to confirm Grant/Keepalive tolerates
|
||||
// TestV3LeaseFailureOverlap issues Grant and KeepAlive requests to a cluster
|
||||
// before, during, and after quorum loss to confirm Grant/KeepAlive tolerates
|
||||
// transient cluster failure.
|
||||
func TestV3LeaseFailureOverlap(t *testing.T) {
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2})
|
||||
|
@ -157,7 +157,7 @@ func TestLeasingPutInvalidateNew(t *testing.T) {
|
||||
}
|
||||
|
||||
// TestLeasingPutInvalidateExisting checks the leasing KV updates its cache on a Put to an existing key.
|
||||
func TestLeasingPutInvalidatExisting(t *testing.T) {
|
||||
func TestLeasingPutInvalidateExisting(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
@ -190,7 +190,7 @@ func TestLeasingPutInvalidatExisting(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestLeasingGetLease checks that keys with TTLs are not leased.
|
||||
// TestLeasingGetNoLeaseTTL checks a key with a TTL is not leased.
|
||||
func TestLeasingGetNoLeaseTTL(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
@ -259,7 +259,7 @@ func TestLeasingGetSerializable(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestLeasingPrevKey checks the cache respects the PrevKV flag on puts.
|
||||
// TestLeasingPrevKey checks the cache respects WithPrevKV on puts.
|
||||
func TestLeasingPrevKey(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2})
|
||||
@ -272,11 +272,10 @@ func TestLeasingPrevKey(t *testing.T) {
|
||||
if _, err = clus.Client(0).Put(context.TODO(), "k", "abc"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// fetch without prevkv to acquire leasing key
|
||||
// acquire leasing key
|
||||
if _, err = lkv.Get(context.TODO(), "k"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// fetch prevkv via put
|
||||
resp, err := lkv.Put(context.TODO(), "k", "def", clientv3.WithPrevKV())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -889,7 +888,7 @@ func TestLeasingTxnNonOwnerPut(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestLeasingTxnRandIfThen randomly leases keys two separate clients, then
|
||||
// TestLeasingTxnRandIfThenOrElse randomly leases keys two separate clients, then
|
||||
// issues a random If/{Then,Else} transaction on those keys to one client.
|
||||
func TestLeasingTxnRandIfThenOrElse(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
@ -1286,7 +1285,7 @@ func TestLeasingPutGetDeleteConcurrent(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestLeasingReconnectRevoke checks that revocation works if
|
||||
// TestLeasingReconnectOwnerRevoke checks that revocation works if
|
||||
// disconnected when trying to submit revoke txn.
|
||||
func TestLeasingReconnectOwnerRevoke(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
@ -1312,7 +1311,7 @@ func TestLeasingReconnectOwnerRevoke(t *testing.T) {
|
||||
|
||||
cctx, cancel := context.WithCancel(context.TODO())
|
||||
sdonec, pdonec := make(chan struct{}), make(chan struct{})
|
||||
// make lkv1 connection choppy so txns fail
|
||||
// make lkv1 connection choppy so Txn fails
|
||||
go func() {
|
||||
defer close(sdonec)
|
||||
for i := 0; i < 10 && cctx.Err() == nil; i++ {
|
||||
@ -1346,7 +1345,7 @@ func TestLeasingReconnectOwnerRevoke(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestLeasingReconnectRevokeCompaction checks that revocation works if
|
||||
// TestLeasingReconnectOwnerRevokeCompact checks that revocation works if
|
||||
// disconnected and the watch is compacted.
|
||||
func TestLeasingReconnectOwnerRevokeCompact(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
@ -1551,7 +1550,7 @@ func TestLeasingTxnAtomicCache(t *testing.T) {
|
||||
wgGetters.Wait()
|
||||
}
|
||||
|
||||
// TestLeasingReconnectTxn checks that txns are resilient to disconnects.
|
||||
// TestLeasingReconnectTxn checks that Txn is resilient to disconnects.
|
||||
func TestLeasingReconnectTxn(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
|
@ -62,7 +62,7 @@ func TestUserErrorAuth(t *testing.T) {
|
||||
authapi := clus.RandClient()
|
||||
authSetupRoot(t, authapi.Auth)
|
||||
|
||||
// un-authenticated client
|
||||
// unauthenticated client
|
||||
if _, err := authapi.UserAdd(context.TODO(), "foo", "bar"); err != rpctypes.ErrUserNotFound {
|
||||
t.Fatalf("expected %v, got %v", rpctypes.ErrUserNotFound, err)
|
||||
}
|
||||
|
@ -52,8 +52,8 @@ func runWatchTest(t *testing.T, f watcherTest) {
|
||||
|
||||
wclientMember := rand.Intn(3)
|
||||
w := clus.Client(wclientMember).Watcher
|
||||
// select a different client from wclient so puts succeed if
|
||||
// a test knocks out the watcher client
|
||||
// select a different client for KV operations so puts succeed if
|
||||
// a test knocks out the watcher client.
|
||||
kvMember := rand.Intn(3)
|
||||
for kvMember == wclientMember {
|
||||
kvMember = rand.Intn(3)
|
||||
@ -804,7 +804,8 @@ func TestWatchWithFilter(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestWatchWithCreatedNotification checks that createdNotification works.
|
||||
// TestWatchWithCreatedNotification checks that WithCreatedNotify returns a
|
||||
// Created watch response.
|
||||
func TestWatchWithCreatedNotification(t *testing.T) {
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
@ -841,8 +842,7 @@ func TestWatchWithCreatedNotificationDropConn(t *testing.T) {
|
||||
|
||||
cluster.Members[0].DropConnections()
|
||||
|
||||
// try to receive from watch channel again
|
||||
// ensure it doesn't post another createNotify
|
||||
// check watch channel doesn't post another watch response.
|
||||
select {
|
||||
case wresp := <-wch:
|
||||
t.Fatalf("got unexpected watch response: %+v\n", wresp)
|
||||
@ -860,7 +860,7 @@ func TestWatchCancelOnServer(t *testing.T) {
|
||||
client := cluster.RandClient()
|
||||
numWatches := 10
|
||||
|
||||
// grpcproxy starts watches to detect leadership after the proxy server
|
||||
// The grpc proxy starts watches to detect leadership after the proxy server
|
||||
// returns as started; to avoid racing on the proxy's internal watches, wait
|
||||
// until require leader watches get create responses to ensure the leadership
|
||||
// watches have started.
|
||||
@ -966,7 +966,7 @@ func testWatchOverlapContextCancel(t *testing.T, f func(*integration.ClusterV3))
|
||||
t.Fatalf("unexpected closed channel %p", wch)
|
||||
}
|
||||
// may take a second or two to reestablish a watcher because of
|
||||
// grpc backoff policies for disconnects
|
||||
// grpc back off policies for disconnects
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Errorf("timed out waiting for watch on %p", wch)
|
||||
}
|
||||
@ -990,7 +990,7 @@ func testWatchOverlapContextCancel(t *testing.T, f func(*integration.ClusterV3))
|
||||
}
|
||||
}
|
||||
|
||||
// TestWatchCanelAndCloseClient ensures that canceling a watcher then immediately
|
||||
// TestWatchCancelAndCloseClient ensures that canceling a watcher then immediately
|
||||
// closing the client does not return a client closing error.
|
||||
func TestWatchCancelAndCloseClient(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
@ -30,7 +30,7 @@ type (
|
||||
LeaseID int64
|
||||
)
|
||||
|
||||
// LeaseGrantResponse is used to convert the protobuf grant response.
|
||||
// LeaseGrantResponse wraps the protobuf message LeaseGrantResponse.
|
||||
type LeaseGrantResponse struct {
|
||||
*pb.ResponseHeader
|
||||
ID LeaseID
|
||||
@ -38,14 +38,14 @@ type LeaseGrantResponse struct {
|
||||
Error string
|
||||
}
|
||||
|
||||
// LeaseKeepAliveResponse is used to convert the protobuf keepalive response.
|
||||
// LeaseKeepAliveResponse wraps the protobuf message LeaseKeepAliveResponse.
|
||||
type LeaseKeepAliveResponse struct {
|
||||
*pb.ResponseHeader
|
||||
ID LeaseID
|
||||
TTL int64
|
||||
}
|
||||
|
||||
// LeaseTimeToLiveResponse is used to convert the protobuf lease timetolive response.
|
||||
// LeaseTimeToLiveResponse wraps the protobuf message LeaseTimeToLiveResponse.
|
||||
type LeaseTimeToLiveResponse struct {
|
||||
*pb.ResponseHeader
|
||||
ID LeaseID `json:"id"`
|
||||
@ -66,7 +66,7 @@ type LeaseStatus struct {
|
||||
// TODO: TTL int64
|
||||
}
|
||||
|
||||
// LeaseLeasesResponse is used to convert the protobuf lease list response.
|
||||
// LeaseLeasesResponse wraps the protobuf message LeaseLeasesResponse.
|
||||
type LeaseLeasesResponse struct {
|
||||
*pb.ResponseHeader
|
||||
Leases []LeaseStatus `json:"leases"`
|
||||
@ -116,7 +116,7 @@ type Lease interface {
|
||||
// KeepAlive keeps the given lease alive forever.
|
||||
KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
|
||||
|
||||
// KeepAliveOnce renews the lease once. In most of the cases, Keepalive
|
||||
// KeepAliveOnce renews the lease once. In most of the cases, KeepAlive
|
||||
// should be used instead of KeepAliveOnce.
|
||||
KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
|
||||
|
||||
@ -345,7 +345,7 @@ func (l *lessor) keepAliveCtxCloser(id LeaseID, ctx context.Context, donec <-cha
|
||||
}
|
||||
}
|
||||
|
||||
// closeRequireLeader scans all keep alives for ctxs that have require leader
|
||||
// closeRequireLeader scans keepAlives for ctxs that have require leader
|
||||
// and closes the associated channels.
|
||||
func (l *lessor) closeRequireLeader() {
|
||||
l.mu.Lock()
|
||||
@ -457,7 +457,7 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) {
|
||||
}
|
||||
}
|
||||
|
||||
// resetRecv opens a new lease stream and starts sending LeaseKeepAliveRequests
|
||||
// resetRecv opens a new lease stream and starts sending keep alive requests.
|
||||
func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
|
||||
sctx, cancel := context.WithCancel(l.stopCtx)
|
||||
stream, err := l.remote.LeaseKeepAlive(sctx, grpc.FailFast(false))
|
||||
@ -536,7 +536,7 @@ func (l *lessor) deadlineLoop() {
|
||||
}
|
||||
}
|
||||
|
||||
// sendKeepAliveLoop sends LeaseKeepAliveRequests for the lifetime of a lease stream
|
||||
// sendKeepAliveLoop sends keep alive requests for the lifetime of the given stream.
|
||||
func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
|
||||
for {
|
||||
var tosend []LeaseID
|
||||
|
@ -285,7 +285,7 @@ func (lc *leaseCache) evalOps(ops []v3.Op) ([]*v3pb.ResponseOp, bool) {
|
||||
resps := make([]*v3pb.ResponseOp, len(ops))
|
||||
for i, op := range ops {
|
||||
if !op.IsGet() || isBadOp(op) {
|
||||
// TODO: support read-only txns
|
||||
// TODO: support read-only Txn
|
||||
return nil, false
|
||||
}
|
||||
lk := lc.entries[string(op.KeyBytes())]
|
||||
|
@ -12,34 +12,35 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// Package leasing is a clientv3 wrapper that provides the client exclusive write access to a key by acquiring a lease and be lineraizably
|
||||
// served locally. This leasing layer can either directly wrap the etcd client or
|
||||
// it can be exposed through the etcd grace proxy server, granting multiple clients write access.
|
||||
// Package leasing serves linearizable reads from a local cache by acquiring
|
||||
// exclusive write access to keys through a client-side leasing protocol. This
|
||||
// leasing layer can either directly wrap the etcd client or it can be exposed
|
||||
// through the etcd grpc proxy server, granting multiple clients write access.
|
||||
//
|
||||
// First, create a leasing client interface:
|
||||
// First, create a leasing KV from a clientv3.Client 'cli':
|
||||
//
|
||||
// leasingCli,error = leasing.NewKV(cli.KV, "leasing-prefix")
|
||||
// if error != nil {
|
||||
// //handle error
|
||||
// }
|
||||
// lkv, err := leasing.NewKV(cli, "leasing-prefix")
|
||||
// if err != nil {
|
||||
// // handle error
|
||||
// }
|
||||
//
|
||||
// The first range request acquires the lease by adding the leasing key ("leasing-prefix"/key) on the server and stores the key locally.
|
||||
// Further linearized read requests using 'cli.leasing' will be served locally as long as the lease exists:
|
||||
// cli.Put(context.TODO(), "abc", "123")
|
||||
// A range request for a key "abc" tries to acquire a leasing key so it can cache the range's
|
||||
// key locally. On the server, the leasing key is stored to "leasing-prefix/abc":
|
||||
//
|
||||
// Lease Acquisition:
|
||||
// leasingCli.Get(context.TODO(), "abc")
|
||||
// resp, err := lkv.Get(context.TODO(), "abc")
|
||||
//
|
||||
// Local reads:
|
||||
// resp,_ := leasingCli.Get(context.TODO(), "abc")
|
||||
// fmt.Printf("%s\n", resp.Kvs[0].Value)
|
||||
// //Output: 123 (served locally)
|
||||
// Future linearized read requests using 'lkv' will be served locally for the lease's lifetime:
|
||||
//
|
||||
// Lease Revocation:
|
||||
// If a client writes to the key owned by the leasing client,then the leasing client gives up its lease allowing the client to modify the key.
|
||||
// cli.Put(context.TODO(), "abc", "456")
|
||||
// resp, _ = leasingCli.Get("abc")
|
||||
// fmt.Printf("%s\n", resp.Kvs[0].Value)
|
||||
// // Output: 456 (fetched from server)
|
||||
// resp, err = lkv.Get(context.TODO(), "abc")
|
||||
//
|
||||
// If another leasing client writes to a leased key, then the owner relinquishes its exclusive
|
||||
// access, permitting the writer to modify the key:
|
||||
//
|
||||
// lkv2, err := leasing.NewKV(cli, "leasing-prefix")
|
||||
// if err != nil {
|
||||
// // handle error
|
||||
// }
|
||||
// lkv2.Put(context.TODO(), "abc", "456")
|
||||
// resp, err = lkv.Get("abc")
|
||||
//
|
||||
package leasing
|
||||
|
@ -39,7 +39,7 @@ type Maintenance interface {
|
||||
// AlarmDisarm disarms a given alarm.
|
||||
AlarmDisarm(ctx context.Context, m *AlarmMember) (*AlarmResponse, error)
|
||||
|
||||
// Defragment defragments storage backend of the etcd member with given endpoint.
|
||||
// Defragment releases wasted space from internal fragmentation on a given etcd member.
|
||||
// Defragment is only needed when deleting a large number of keys and want to reclaim
|
||||
// the resources.
|
||||
// Defragment is an expensive operation. User should avoid defragmenting multiple members
|
||||
@ -56,7 +56,7 @@ type Maintenance interface {
|
||||
// is non-zero, the hash is computed on all keys at or below the given revision.
|
||||
HashKV(ctx context.Context, endpoint string, rev int64) (*HashKVResponse, error)
|
||||
|
||||
// Snapshot provides a reader for a snapshot of a backend.
|
||||
// Snapshot provides a reader for a point-in-time snapshot of etcd.
|
||||
Snapshot(ctx context.Context) (io.ReadCloser, error)
|
||||
|
||||
// MoveLeader requests current leader to transfer its leadership to the transferee.
|
||||
|
@ -83,7 +83,7 @@ func TestGRPCResolver(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestGRPCResolverMultiInit ensures the resolver will initialize
|
||||
// TestGRPCResolverMulti ensures the resolver will initialize
|
||||
// correctly with multiple hosts and correctly receive multiple
|
||||
// updates in a single revision.
|
||||
func TestGRPCResolverMulti(t *testing.T) {
|
||||
|
@ -75,7 +75,7 @@ type Op struct {
|
||||
elseOps []Op
|
||||
}
|
||||
|
||||
// accesors / mutators
|
||||
// accessors / mutators
|
||||
|
||||
func (op Op) IsTxn() bool { return op.t == tTxn }
|
||||
func (op Op) Txn() ([]Cmp, []Op, []Op) { return op.cmps, op.thenOps, op.elseOps }
|
||||
@ -104,30 +104,24 @@ func (op Op) IsDelete() bool { return op.t == tDeleteRange }
|
||||
// IsSerializable returns true if the serializable field is true.
|
||||
func (op Op) IsSerializable() bool { return op.serializable == true }
|
||||
|
||||
// IsKeysOnly returns true if the keysonly field is true.
|
||||
// IsKeysOnly returns whether keysOnly is set.
|
||||
func (op Op) IsKeysOnly() bool { return op.keysOnly == true }
|
||||
|
||||
// IsCountOnly returns true if the countonly field is true.
|
||||
// IsCountOnly returns whether countOnly is set.
|
||||
func (op Op) IsCountOnly() bool { return op.countOnly == true }
|
||||
|
||||
// MinModRev returns if field is populated.
|
||||
// MinModRev returns the operation's minimum modify revision.
|
||||
func (op Op) MinModRev() int64 { return op.minModRev }
|
||||
|
||||
// MaxModRev returns if field is populated.
|
||||
// MaxModRev returns the operation's maximum modify revision.
|
||||
func (op Op) MaxModRev() int64 { return op.maxModRev }
|
||||
|
||||
// MinCreateRev returns if field is populated.
|
||||
// MinCreateRev returns the operation's minimum create revision.
|
||||
func (op Op) MinCreateRev() int64 { return op.minCreateRev }
|
||||
|
||||
// MaxCreateRev returns if field is populated.
|
||||
// MaxCreateRev returns the operation's maximum create revision.
|
||||
func (op Op) MaxCreateRev() int64 { return op.maxCreateRev }
|
||||
|
||||
// Limit returns if field is populated.
|
||||
func (op Op) retLimit() int64 { return op.limit }
|
||||
|
||||
// Sort returns if field is populated.
|
||||
func (op Op) retSort() bool { return op.sort != nil }
|
||||
|
||||
// WithRangeBytes sets the byte slice for the Op's range end.
|
||||
func (op *Op) WithRangeBytes(end []byte) { op.end = end }
|
||||
|
||||
@ -330,9 +324,9 @@ func WithSort(target SortTarget, order SortOrder) OpOption {
|
||||
if target == SortByKey && order == SortAscend {
|
||||
// If order != SortNone, server fetches the entire key-space,
|
||||
// and then applies the sort and limit, if provided.
|
||||
// Since current mvcc.Range implementation returns results
|
||||
// sorted by keys in lexicographically ascending order,
|
||||
// client should ignore SortOrder if the target is SortByKey.
|
||||
// Since by default the server returns results sorted by keys
|
||||
// in lexicographically ascending order, the client should ignore
|
||||
// SortOrder if the target is SortByKey.
|
||||
order = SortNone
|
||||
}
|
||||
op.sort = &SortOption{target, order}
|
||||
@ -473,7 +467,7 @@ func WithPrevKV() OpOption {
|
||||
}
|
||||
|
||||
// WithIgnoreValue updates the key using its current value.
|
||||
// Empty value should be passed when ignore_value is set.
|
||||
// This option can not be combined with non-empty values.
|
||||
// Returns an error if the key does not exist.
|
||||
func WithIgnoreValue() OpOption {
|
||||
return func(op *Op) {
|
||||
@ -482,7 +476,7 @@ func WithIgnoreValue() OpOption {
|
||||
}
|
||||
|
||||
// WithIgnoreLease updates the key using its current lease.
|
||||
// Empty lease should be passed when ignore_lease is set.
|
||||
// This option can not be combined with WithLease.
|
||||
// Returns an error if the key does not exist.
|
||||
func WithIgnoreLease() OpOption {
|
||||
return func(op *Op) {
|
||||
@ -507,8 +501,7 @@ func (op *LeaseOp) applyOpts(opts []LeaseOption) {
|
||||
}
|
||||
}
|
||||
|
||||
// WithAttachedKeys requests lease timetolive API to return
|
||||
// attached keys of given lease ID.
|
||||
// WithAttachedKeys makes TimeToLive list the keys attached to the given lease ID.
|
||||
func WithAttachedKeys() LeaseOption {
|
||||
return func(op *LeaseOp) { op.attachedKeys = true }
|
||||
}
|
||||
|
@ -36,11 +36,12 @@ func NewOrderViolationSwitchEndpointClosure(c clientv3.Client) OrderViolationFun
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
eps := c.Endpoints()
|
||||
// force client to connect to the specificied endpoint by limiting to a single endpoint
|
||||
// force client to connect to given endpoint by limiting to a single endpoint
|
||||
c.SetEndpoints(eps[violationCount%len(eps)])
|
||||
time.Sleep(1 * time.Second) // give enough time for operation
|
||||
// set available endpoints back to all endpoints in order to enure
|
||||
// that the client has access to all the endpoints.
|
||||
// give enough time for operation
|
||||
time.Sleep(1 * time.Second)
|
||||
// set available endpoints back to all endpoints in to ensure
|
||||
// the client has access to all the endpoints.
|
||||
c.SetEndpoints(eps...)
|
||||
violationCount++
|
||||
return nil
|
||||
|
@ -24,7 +24,7 @@ import (
|
||||
|
||||
// Txn is the interface that wraps mini-transactions.
|
||||
//
|
||||
// Tx.If(
|
||||
// Txn(context.TODO()).If(
|
||||
// Compare(Value(k1), ">", v1),
|
||||
// Compare(Version(k1), "=", 2)
|
||||
// ).Then(
|
||||
|
@ -135,7 +135,7 @@ type watchGrpcStream struct {
|
||||
respc chan *pb.WatchResponse
|
||||
// donec closes to broadcast shutdown
|
||||
donec chan struct{}
|
||||
// errc transmits errors from grpc Recv to the watch stream reconn logic
|
||||
// errc transmits errors from grpc Recv to the watch stream reconnect logic
|
||||
errc chan error
|
||||
// closingc gets the watcherStream of closing watchers
|
||||
closingc chan *watcherStream
|
||||
@ -434,7 +434,7 @@ func (w *watchGrpcStream) run() {
|
||||
initReq: *wreq,
|
||||
id: -1,
|
||||
outc: outc,
|
||||
// unbufffered so resumes won't cause repeat events
|
||||
// unbuffered so resumes won't cause repeat events
|
||||
recvc: make(chan *WatchResponse),
|
||||
}
|
||||
|
||||
@ -486,7 +486,7 @@ func (w *watchGrpcStream) run() {
|
||||
req := &pb.WatchRequest{RequestUnion: cr}
|
||||
wc.Send(req)
|
||||
}
|
||||
// watch client failed to recv; spawn another if possible
|
||||
// watch client failed on Recv; spawn another if possible
|
||||
case err := <-w.errc:
|
||||
if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
|
||||
closeErr = err
|
||||
@ -748,7 +748,7 @@ func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan str
|
||||
return donec
|
||||
}
|
||||
|
||||
// joinSubstream waits for all substream goroutines to complete
|
||||
// joinSubstreams waits for all substream goroutines to complete.
|
||||
func (w *watchGrpcStream) joinSubstreams() {
|
||||
for _, ws := range w.substreams {
|
||||
<-ws.donec
|
||||
@ -760,7 +760,7 @@ func (w *watchGrpcStream) joinSubstreams() {
|
||||
}
|
||||
}
|
||||
|
||||
// openWatchClient retries opening a watchclient until retryConnection fails
|
||||
// openWatchClient retries opening a watch client until success or halt.
|
||||
func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
|
||||
for {
|
||||
select {
|
||||
@ -781,7 +781,7 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error)
|
||||
return ws, nil
|
||||
}
|
||||
|
||||
// toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
|
||||
// toPB converts an internal watch request structure to its protobuf WatchRequest structure.
|
||||
func (wr *watchRequest) toPB() *pb.WatchRequest {
|
||||
req := &pb.WatchCreateRequest{
|
||||
StartRevision: wr.rev,
|
||||
|
6
test
6
test
@ -333,6 +333,12 @@ function fmt_pass {
|
||||
echo -e "goword checking failed:\n${gowordRes}"
|
||||
exit 255
|
||||
fi
|
||||
# check some spelling
|
||||
gowordRes=$(goword -ignore-file=.words clientv3/{*,*/*}.go 2>&1 | grep spell | sort)
|
||||
if [ ! -z "$gowordRes" ]; then
|
||||
echo -e "goword checking failed:\n${gowordRes}"
|
||||
exit 255
|
||||
fi
|
||||
else
|
||||
echo "Skipping goword..."
|
||||
fi
|
||||
|
Loading…
x
Reference in New Issue
Block a user