mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #17582 from ivanvc/address-client-var-naming-lint-rule
client: address golangci var-naming issues
This commit is contained in:
commit
d639abe1aa
@ -17,19 +17,23 @@ package logutil
|
|||||||
import "fmt"
|
import "fmt"
|
||||||
|
|
||||||
const (
|
const (
|
||||||
JsonLogFormat = "json"
|
JSONLogFormat = "json"
|
||||||
ConsoleLogFormat = "console"
|
ConsoleLogFormat = "console"
|
||||||
|
//revive:disable:var-naming
|
||||||
|
// Deprecated: Please use JSONLogFormat.
|
||||||
|
JsonLogFormat = JSONLogFormat
|
||||||
|
//revive:enable:var-naming
|
||||||
)
|
)
|
||||||
|
|
||||||
var DefaultLogFormat = JsonLogFormat
|
var DefaultLogFormat = JSONLogFormat
|
||||||
|
|
||||||
// ConvertToZapFormat converts and validated log format string.
|
// ConvertToZapFormat converts and validated log format string.
|
||||||
func ConvertToZapFormat(format string) (string, error) {
|
func ConvertToZapFormat(format string) (string, error) {
|
||||||
switch format {
|
switch format {
|
||||||
case ConsoleLogFormat:
|
case ConsoleLogFormat:
|
||||||
return ConsoleLogFormat, nil
|
return ConsoleLogFormat, nil
|
||||||
case JsonLogFormat:
|
case JSONLogFormat:
|
||||||
return JsonLogFormat, nil
|
return JSONLogFormat, nil
|
||||||
case "":
|
case "":
|
||||||
return DefaultLogFormat, nil
|
return DefaultLogFormat, nil
|
||||||
default:
|
default:
|
||||||
|
@ -24,9 +24,9 @@ func TestLogFormat(t *testing.T) {
|
|||||||
want string
|
want string
|
||||||
errExpected bool
|
errExpected bool
|
||||||
}{
|
}{
|
||||||
{"json", JsonLogFormat, false},
|
{"json", JSONLogFormat, false},
|
||||||
{"console", ConsoleLogFormat, false},
|
{"console", ConsoleLogFormat, false},
|
||||||
{"", JsonLogFormat, false},
|
{"", JSONLogFormat, false},
|
||||||
{"konsole", "", true},
|
{"konsole", "", true},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,13 +26,13 @@ func TestGetCipherSuite_not_existing(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func CipherSuiteExpectedToExist(tb testing.TB, cipher string, expectedId uint16) {
|
func CipherSuiteExpectedToExist(tb testing.TB, cipher string, expectedID uint16) {
|
||||||
vid, ok := GetCipherSuite(cipher)
|
vid, ok := GetCipherSuite(cipher)
|
||||||
if !ok {
|
if !ok {
|
||||||
tb.Errorf("Expected %v cipher to exist", cipher)
|
tb.Errorf("Expected %v cipher to exist", cipher)
|
||||||
}
|
}
|
||||||
if vid != expectedId {
|
if vid != expectedID {
|
||||||
tb.Errorf("For %v expected=%v found=%v", cipher, expectedId, vid)
|
tb.Errorf("For %v expected=%v found=%v", cipher, expectedID, vid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,54 +20,54 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
const ENV_VERIFY = "ETCD_VERIFY"
|
const envVerify = "ETCD_VERIFY"
|
||||||
|
|
||||||
type VerificationType string
|
type VerificationType string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
ENV_VERIFY_VALUE_ALL VerificationType = "all"
|
envVerifyValueAll VerificationType = "all"
|
||||||
ENV_VERIFY_VALUE_ASSERT VerificationType = "assert"
|
envVerifyValueAssert VerificationType = "assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
func getEnvVerify() string {
|
func getEnvVerify() string {
|
||||||
return strings.ToLower(os.Getenv(ENV_VERIFY))
|
return strings.ToLower(os.Getenv(envVerify))
|
||||||
}
|
}
|
||||||
|
|
||||||
func IsVerificationEnabled(verification VerificationType) bool {
|
func IsVerificationEnabled(verification VerificationType) bool {
|
||||||
env := getEnvVerify()
|
env := getEnvVerify()
|
||||||
return env == string(ENV_VERIFY_VALUE_ALL) || env == strings.ToLower(string(verification))
|
return env == string(envVerifyValueAll) || env == strings.ToLower(string(verification))
|
||||||
}
|
}
|
||||||
|
|
||||||
// EnableVerifications sets `ENV_VERIFY` and returns a function that
|
// EnableVerifications sets `envVerify` and returns a function that
|
||||||
// can be used to bring the original settings.
|
// can be used to bring the original settings.
|
||||||
func EnableVerifications(verification VerificationType) func() {
|
func EnableVerifications(verification VerificationType) func() {
|
||||||
previousEnv := getEnvVerify()
|
previousEnv := getEnvVerify()
|
||||||
os.Setenv(ENV_VERIFY, string(verification))
|
os.Setenv(envVerify, string(verification))
|
||||||
return func() {
|
return func() {
|
||||||
os.Setenv(ENV_VERIFY, previousEnv)
|
os.Setenv(envVerify, previousEnv)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// EnableAllVerifications enables verification and returns a function
|
// EnableAllVerifications enables verification and returns a function
|
||||||
// that can be used to bring the original settings.
|
// that can be used to bring the original settings.
|
||||||
func EnableAllVerifications() func() {
|
func EnableAllVerifications() func() {
|
||||||
return EnableVerifications(ENV_VERIFY_VALUE_ALL)
|
return EnableVerifications(envVerifyValueAll)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DisableVerifications unsets `ENV_VERIFY` and returns a function that
|
// DisableVerifications unsets `envVerify` and returns a function that
|
||||||
// can be used to bring the original settings.
|
// can be used to bring the original settings.
|
||||||
func DisableVerifications() func() {
|
func DisableVerifications() func() {
|
||||||
previousEnv := getEnvVerify()
|
previousEnv := getEnvVerify()
|
||||||
os.Unsetenv(ENV_VERIFY)
|
os.Unsetenv(envVerify)
|
||||||
return func() {
|
return func() {
|
||||||
os.Setenv(ENV_VERIFY, previousEnv)
|
os.Setenv(envVerify, previousEnv)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify performs verification if the assertions are enabled.
|
// Verify performs verification if the assertions are enabled.
|
||||||
// In the default setup running in tests and skipped in the production code.
|
// In the default setup running in tests and skipped in the production code.
|
||||||
func Verify(f func()) {
|
func Verify(f func()) {
|
||||||
if IsVerificationEnabled(ENV_VERIFY_VALUE_ASSERT) {
|
if IsVerificationEnabled(envVerifyValueAssert) {
|
||||||
f()
|
f()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -345,11 +345,11 @@ func authority(endpoint string) string {
|
|||||||
func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials {
|
func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials {
|
||||||
r := endpoint.RequiresCredentials(ep)
|
r := endpoint.RequiresCredentials(ep)
|
||||||
switch r {
|
switch r {
|
||||||
case endpoint.CREDS_DROP:
|
case endpoint.CredsDrop:
|
||||||
return nil
|
return nil
|
||||||
case endpoint.CREDS_OPTIONAL:
|
case endpoint.CredsOptional:
|
||||||
return c.creds
|
return c.creds
|
||||||
case endpoint.CREDS_REQUIRE:
|
case endpoint.CredsRequire:
|
||||||
if c.creds != nil {
|
if c.creds != nil {
|
||||||
return c.creds
|
return c.creds
|
||||||
}
|
}
|
||||||
|
@ -25,12 +25,12 @@ import (
|
|||||||
type CredsRequirement int
|
type CredsRequirement int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// CREDS_REQUIRE - Credentials/certificate required for thi type of connection.
|
// CredsRequire - Credentials/certificate required for thi type of connection.
|
||||||
CREDS_REQUIRE CredsRequirement = iota
|
CredsRequire CredsRequirement = iota
|
||||||
// CREDS_DROP - Credentials/certificate not needed and should get ignored.
|
// CredsDrop - Credentials/certificate not needed and should get ignored.
|
||||||
CREDS_DROP
|
CredsDrop
|
||||||
// CREDS_OPTIONAL - Credentials/certificate might be used if supplied
|
// CredsOptional - Credentials/certificate might be used if supplied
|
||||||
CREDS_OPTIONAL
|
CredsOptional
|
||||||
)
|
)
|
||||||
|
|
||||||
func extractHostFromHostPort(ep string) string {
|
func extractHostFromHostPort(ep string) string {
|
||||||
@ -54,20 +54,20 @@ func mustSplit2(s, sep string) (string, string) {
|
|||||||
func schemeToCredsRequirement(schema string) CredsRequirement {
|
func schemeToCredsRequirement(schema string) CredsRequirement {
|
||||||
switch schema {
|
switch schema {
|
||||||
case "https", "unixs":
|
case "https", "unixs":
|
||||||
return CREDS_REQUIRE
|
return CredsRequire
|
||||||
case "http":
|
case "http":
|
||||||
return CREDS_DROP
|
return CredsDrop
|
||||||
case "unix":
|
case "unix":
|
||||||
// Preserving previous behavior from:
|
// Preserving previous behavior from:
|
||||||
// https://github.com/etcd-io/etcd/blob/dae29bb719dd69dc119146fc297a0628fcc1ccf8/client/v3/client.go#L212
|
// https://github.com/etcd-io/etcd/blob/dae29bb719dd69dc119146fc297a0628fcc1ccf8/client/v3/client.go#L212
|
||||||
// that likely was a bug due to missing 'fallthrough'.
|
// that likely was a bug due to missing 'fallthrough'.
|
||||||
// At the same time it seems legit to let the users decide whether they
|
// At the same time it seems legit to let the users decide whether they
|
||||||
// want credential control or not (and 'unixs' schema is not a standard thing).
|
// want credential control or not (and 'unixs' schema is not a standard thing).
|
||||||
return CREDS_OPTIONAL
|
return CredsOptional
|
||||||
case "":
|
case "":
|
||||||
return CREDS_OPTIONAL
|
return CredsOptional
|
||||||
default:
|
default:
|
||||||
return CREDS_OPTIONAL
|
return CredsOptional
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -106,7 +106,7 @@ func translateEndpoint(ep string) (addr string, serverName string, requireCreds
|
|||||||
if strings.Contains(ep, "://") {
|
if strings.Contains(ep, "://") {
|
||||||
url, err := url.Parse(ep)
|
url, err := url.Parse(ep)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ep, ep, CREDS_OPTIONAL
|
return ep, ep, CredsOptional
|
||||||
}
|
}
|
||||||
if url.Scheme == "http" || url.Scheme == "https" {
|
if url.Scheme == "http" || url.Scheme == "https" {
|
||||||
return url.Host, url.Host, schemeToCredsRequirement(url.Scheme)
|
return url.Host, url.Host, schemeToCredsRequirement(url.Scheme)
|
||||||
@ -114,7 +114,7 @@ func translateEndpoint(ep string) (addr string, serverName string, requireCreds
|
|||||||
return ep, url.Host, schemeToCredsRequirement(url.Scheme)
|
return ep, url.Host, schemeToCredsRequirement(url.Scheme)
|
||||||
}
|
}
|
||||||
// Handles plain addresses like 10.0.0.44:437.
|
// Handles plain addresses like 10.0.0.44:437.
|
||||||
return ep, ep, CREDS_OPTIONAL
|
return ep, ep, CredsOptional
|
||||||
}
|
}
|
||||||
|
|
||||||
// RequiresCredentials returns whether given endpoint requires
|
// RequiresCredentials returns whether given endpoint requires
|
||||||
|
@ -25,38 +25,38 @@ func Test_interpret(t *testing.T) {
|
|||||||
wantServerName string
|
wantServerName string
|
||||||
wantRequiresCreds CredsRequirement
|
wantRequiresCreds CredsRequirement
|
||||||
}{
|
}{
|
||||||
{"127.0.0.1", "127.0.0.1", "127.0.0.1", CREDS_OPTIONAL},
|
{"127.0.0.1", "127.0.0.1", "127.0.0.1", CredsOptional},
|
||||||
{"localhost", "localhost", "localhost", CREDS_OPTIONAL},
|
{"localhost", "localhost", "localhost", CredsOptional},
|
||||||
{"localhost:8080", "localhost:8080", "localhost:8080", CREDS_OPTIONAL},
|
{"localhost:8080", "localhost:8080", "localhost:8080", CredsOptional},
|
||||||
|
|
||||||
{"unix:127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CREDS_OPTIONAL},
|
{"unix:127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CredsOptional},
|
||||||
{"unix:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1:8080", CREDS_OPTIONAL},
|
{"unix:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1:8080", CredsOptional},
|
||||||
|
|
||||||
{"unix://127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CREDS_OPTIONAL},
|
{"unix://127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CredsOptional},
|
||||||
{"unix://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1:8080", CREDS_OPTIONAL},
|
{"unix://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1:8080", CredsOptional},
|
||||||
|
|
||||||
{"unixs:127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CREDS_REQUIRE},
|
{"unixs:127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CredsRequire},
|
||||||
{"unixs:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1:8080", CREDS_REQUIRE},
|
{"unixs:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1:8080", CredsRequire},
|
||||||
{"unixs://127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CREDS_REQUIRE},
|
{"unixs://127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CredsRequire},
|
||||||
{"unixs://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1:8080", CREDS_REQUIRE},
|
{"unixs://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1:8080", CredsRequire},
|
||||||
|
|
||||||
{"http://127.0.0.1", "127.0.0.1", "127.0.0.1", CREDS_DROP},
|
{"http://127.0.0.1", "127.0.0.1", "127.0.0.1", CredsDrop},
|
||||||
{"http://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1:8080", CREDS_DROP},
|
{"http://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1:8080", CredsDrop},
|
||||||
{"https://127.0.0.1", "127.0.0.1", "127.0.0.1", CREDS_REQUIRE},
|
{"https://127.0.0.1", "127.0.0.1", "127.0.0.1", CredsRequire},
|
||||||
{"https://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1:8080", CREDS_REQUIRE},
|
{"https://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1:8080", CredsRequire},
|
||||||
{"https://localhost:20000", "localhost:20000", "localhost:20000", CREDS_REQUIRE},
|
{"https://localhost:20000", "localhost:20000", "localhost:20000", CredsRequire},
|
||||||
|
|
||||||
{"unix:///tmp/abc", "unix:///tmp/abc", "abc", CREDS_OPTIONAL},
|
{"unix:///tmp/abc", "unix:///tmp/abc", "abc", CredsOptional},
|
||||||
{"unixs:///tmp/abc", "unix:///tmp/abc", "abc", CREDS_REQUIRE},
|
{"unixs:///tmp/abc", "unix:///tmp/abc", "abc", CredsRequire},
|
||||||
{"unix:///tmp/abc:1234", "unix:///tmp/abc:1234", "abc:1234", CREDS_OPTIONAL},
|
{"unix:///tmp/abc:1234", "unix:///tmp/abc:1234", "abc:1234", CredsOptional},
|
||||||
{"unixs:///tmp/abc:1234", "unix:///tmp/abc:1234", "abc:1234", CREDS_REQUIRE},
|
{"unixs:///tmp/abc:1234", "unix:///tmp/abc:1234", "abc:1234", CredsRequire},
|
||||||
{"etcd.io", "etcd.io", "etcd.io", CREDS_OPTIONAL},
|
{"etcd.io", "etcd.io", "etcd.io", CredsOptional},
|
||||||
{"http://etcd.io/abc", "etcd.io", "etcd.io", CREDS_DROP},
|
{"http://etcd.io/abc", "etcd.io", "etcd.io", CredsDrop},
|
||||||
{"dns://something-other", "dns://something-other", "something-other", CREDS_OPTIONAL},
|
{"dns://something-other", "dns://something-other", "something-other", CredsOptional},
|
||||||
|
|
||||||
{"http://[2001:db8:1f70::999:de8:7648:6e8]:100/", "[2001:db8:1f70::999:de8:7648:6e8]:100", "[2001:db8:1f70::999:de8:7648:6e8]:100", CREDS_DROP},
|
{"http://[2001:db8:1f70::999:de8:7648:6e8]:100/", "[2001:db8:1f70::999:de8:7648:6e8]:100", "[2001:db8:1f70::999:de8:7648:6e8]:100", CredsDrop},
|
||||||
{"[2001:db8:1f70::999:de8:7648:6e8]:100", "[2001:db8:1f70::999:de8:7648:6e8]:100", "[2001:db8:1f70::999:de8:7648:6e8]:100", CREDS_OPTIONAL},
|
{"[2001:db8:1f70::999:de8:7648:6e8]:100", "[2001:db8:1f70::999:de8:7648:6e8]:100", "[2001:db8:1f70::999:de8:7648:6e8]:100", CredsOptional},
|
||||||
{"unix:unexpected-file_name#123$456", "unix:unexpected-file_name#123$456", "unexpected-file_name#123$456", CREDS_OPTIONAL},
|
{"unix:unexpected-file_name#123$456", "unix:unexpected-file_name#123$456", "unexpected-file_name#123$456", CredsOptional},
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run("Interpret_"+tt.endpoint, func(t *testing.T) {
|
t.Run("Interpret_"+tt.endpoint, func(t *testing.T) {
|
||||||
|
@ -32,7 +32,7 @@ type MockServer struct {
|
|||||||
ln net.Listener
|
ln net.Listener
|
||||||
Network string
|
Network string
|
||||||
Address string
|
Address string
|
||||||
GrpcServer *grpc.Server
|
GRPCServer *grpc.Server
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ms *MockServer) ResolverAddress() resolver.Address {
|
func (ms *MockServer) ResolverAddress() resolver.Address {
|
||||||
@ -63,7 +63,7 @@ func StartMockServers(count int) (ms *MockServers, err error) {
|
|||||||
func StartMockServersOnNetwork(count int, network string) (ms *MockServers, err error) {
|
func StartMockServersOnNetwork(count int, network string) (ms *MockServers, err error) {
|
||||||
switch network {
|
switch network {
|
||||||
case "tcp":
|
case "tcp":
|
||||||
return startMockServersTcp(count)
|
return startMockServersTCP(count)
|
||||||
case "unix":
|
case "unix":
|
||||||
return startMockServersUnix(count)
|
return startMockServersUnix(count)
|
||||||
default:
|
default:
|
||||||
@ -71,7 +71,7 @@ func StartMockServersOnNetwork(count int, network string) (ms *MockServers, err
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func startMockServersTcp(count int) (ms *MockServers, err error) {
|
func startMockServersTCP(count int) (ms *MockServers, err error) {
|
||||||
addrs := make([]string, 0, count)
|
addrs := make([]string, 0, count)
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
addrs = append(addrs, "localhost:0")
|
addrs = append(addrs, "localhost:0")
|
||||||
@ -133,12 +133,12 @@ func (ms *MockServers) StartAt(idx int) (err error) {
|
|||||||
svr := grpc.NewServer()
|
svr := grpc.NewServer()
|
||||||
pb.RegisterKVServer(svr, &mockKVServer{})
|
pb.RegisterKVServer(svr, &mockKVServer{})
|
||||||
pb.RegisterLeaseServer(svr, &mockLeaseServer{})
|
pb.RegisterLeaseServer(svr, &mockLeaseServer{})
|
||||||
ms.Servers[idx].GrpcServer = svr
|
ms.Servers[idx].GRPCServer = svr
|
||||||
|
|
||||||
ms.wg.Add(1)
|
ms.wg.Add(1)
|
||||||
go func(svr *grpc.Server, l net.Listener) {
|
go func(svr *grpc.Server, l net.Listener) {
|
||||||
svr.Serve(l)
|
svr.Serve(l)
|
||||||
}(ms.Servers[idx].GrpcServer, ms.Servers[idx].ln)
|
}(ms.Servers[idx].GRPCServer, ms.Servers[idx].ln)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -151,8 +151,8 @@ func (ms *MockServers) StopAt(idx int) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ms.Servers[idx].GrpcServer.Stop()
|
ms.Servers[idx].GRPCServer.Stop()
|
||||||
ms.Servers[idx].GrpcServer = nil
|
ms.Servers[idx].GRPCServer = nil
|
||||||
ms.Servers[idx].ln = nil
|
ms.Servers[idx].ln = nil
|
||||||
ms.wg.Done()
|
ms.wg.Done()
|
||||||
}
|
}
|
||||||
|
@ -311,7 +311,7 @@ func waitRetryBackoff(ctx context.Context, attempt uint, callOpts *options) erro
|
|||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
timer.Stop()
|
timer.Stop()
|
||||||
return contextErrToGrpcErr(ctx.Err())
|
return contextErrToGRPCErr(ctx.Err())
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -349,7 +349,7 @@ func isContextError(err error) bool {
|
|||||||
return status.Code(err) == codes.DeadlineExceeded || status.Code(err) == codes.Canceled
|
return status.Code(err) == codes.DeadlineExceeded || status.Code(err) == codes.Canceled
|
||||||
}
|
}
|
||||||
|
|
||||||
func contextErrToGrpcErr(err error) error {
|
func contextErrToGRPCErr(err error) error {
|
||||||
switch err {
|
switch err {
|
||||||
case context.DeadlineExceeded:
|
case context.DeadlineExceeded:
|
||||||
return status.Errorf(codes.DeadlineExceeded, err.Error())
|
return status.Errorf(codes.DeadlineExceeded, err.Error())
|
||||||
|
@ -149,12 +149,12 @@ type watcher struct {
|
|||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
|
|
||||||
// streams holds all the active grpc streams keyed by ctx value.
|
// streams holds all the active grpc streams keyed by ctx value.
|
||||||
streams map[string]*watchGrpcStream
|
streams map[string]*watchGRPCStream
|
||||||
lg *zap.Logger
|
lg *zap.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// watchGrpcStream tracks all watch resources attached to a single grpc stream.
|
// watchGRPCStream tracks all watch resources attached to a single grpc stream.
|
||||||
type watchGrpcStream struct {
|
type watchGRPCStream struct {
|
||||||
owner *watcher
|
owner *watcher
|
||||||
remote pb.WatchClient
|
remote pb.WatchClient
|
||||||
callOpts []grpc.CallOption
|
callOpts []grpc.CallOption
|
||||||
@ -251,7 +251,7 @@ func NewWatcher(c *Client) Watcher {
|
|||||||
func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
|
func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
|
||||||
w := &watcher{
|
w := &watcher{
|
||||||
remote: wc,
|
remote: wc,
|
||||||
streams: make(map[string]*watchGrpcStream),
|
streams: make(map[string]*watchGRPCStream),
|
||||||
}
|
}
|
||||||
if c != nil {
|
if c != nil {
|
||||||
w.callOpts = c.callOpts
|
w.callOpts = c.callOpts
|
||||||
@ -271,9 +271,9 @@ func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false }
|
|||||||
func (vc *valCtx) Done() <-chan struct{} { return valCtxCh }
|
func (vc *valCtx) Done() <-chan struct{} { return valCtxCh }
|
||||||
func (vc *valCtx) Err() error { return nil }
|
func (vc *valCtx) Err() error { return nil }
|
||||||
|
|
||||||
func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
|
func (w *watcher) newWatcherGRPCStream(inctx context.Context) *watchGRPCStream {
|
||||||
ctx, cancel := context.WithCancel(&valCtx{inctx})
|
ctx, cancel := context.WithCancel(&valCtx{inctx})
|
||||||
wgs := &watchGrpcStream{
|
wgs := &watchGRPCStream{
|
||||||
owner: w,
|
owner: w,
|
||||||
remote: w.remote,
|
remote: w.remote,
|
||||||
callOpts: w.callOpts,
|
callOpts: w.callOpts,
|
||||||
@ -334,7 +334,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
|
|||||||
}
|
}
|
||||||
wgs := w.streams[ctxKey]
|
wgs := w.streams[ctxKey]
|
||||||
if wgs == nil {
|
if wgs == nil {
|
||||||
wgs = w.newWatcherGrpcStream(ctx)
|
wgs = w.newWatcherGRPCStream(ctx)
|
||||||
w.streams[ctxKey] = wgs
|
w.streams[ctxKey] = wgs
|
||||||
}
|
}
|
||||||
donec := wgs.donec
|
donec := wgs.donec
|
||||||
@ -412,7 +412,7 @@ func (w *watcher) RequestProgress(ctx context.Context) (err error) {
|
|||||||
}
|
}
|
||||||
wgs := w.streams[ctxKey]
|
wgs := w.streams[ctxKey]
|
||||||
if wgs == nil {
|
if wgs == nil {
|
||||||
wgs = w.newWatcherGrpcStream(ctx)
|
wgs = w.newWatcherGRPCStream(ctx)
|
||||||
w.streams[ctxKey] = wgs
|
w.streams[ctxKey] = wgs
|
||||||
}
|
}
|
||||||
donec := wgs.donec
|
donec := wgs.donec
|
||||||
@ -435,7 +435,7 @@ func (w *watcher) RequestProgress(ctx context.Context) (err error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *watchGrpcStream) close() (err error) {
|
func (w *watchGRPCStream) close() (err error) {
|
||||||
w.cancel()
|
w.cancel()
|
||||||
<-w.donec
|
<-w.donec
|
||||||
select {
|
select {
|
||||||
@ -445,7 +445,7 @@ func (w *watchGrpcStream) close() (err error) {
|
|||||||
return toErr(w.ctx, err)
|
return toErr(w.ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *watcher) closeStream(wgs *watchGrpcStream) {
|
func (w *watcher) closeStream(wgs *watchGRPCStream) {
|
||||||
w.mu.Lock()
|
w.mu.Lock()
|
||||||
close(wgs.donec)
|
close(wgs.donec)
|
||||||
wgs.cancel()
|
wgs.cancel()
|
||||||
@ -455,7 +455,7 @@ func (w *watcher) closeStream(wgs *watchGrpcStream) {
|
|||||||
w.mu.Unlock()
|
w.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
|
func (w *watchGRPCStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
|
||||||
// check watch ID for backward compatibility (<= v3.3)
|
// check watch ID for backward compatibility (<= v3.3)
|
||||||
if resp.WatchId == InvalidWatchID || (resp.Canceled && resp.CancelReason != "") {
|
if resp.WatchId == InvalidWatchID || (resp.Canceled && resp.CancelReason != "") {
|
||||||
w.closeErr = v3rpc.Error(errors.New(resp.CancelReason))
|
w.closeErr = v3rpc.Error(errors.New(resp.CancelReason))
|
||||||
@ -467,7 +467,7 @@ func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream
|
|||||||
w.substreams[ws.id] = ws
|
w.substreams[ws.id] = ws
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) {
|
func (w *watchGRPCStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) {
|
||||||
select {
|
select {
|
||||||
case ws.outc <- *resp:
|
case ws.outc <- *resp:
|
||||||
case <-ws.initReq.ctx.Done():
|
case <-ws.initReq.ctx.Done():
|
||||||
@ -476,7 +476,7 @@ func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchRespo
|
|||||||
close(ws.outc)
|
close(ws.outc)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
|
func (w *watchGRPCStream) closeSubstream(ws *watcherStream) {
|
||||||
// send channel response in case stream was never established
|
// send channel response in case stream was never established
|
||||||
select {
|
select {
|
||||||
case ws.initReq.retc <- ws.outc:
|
case ws.initReq.retc <- ws.outc:
|
||||||
@ -501,7 +501,7 @@ func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// run is the root of the goroutines for managing a watcher client
|
// run is the root of the goroutines for managing a watcher client
|
||||||
func (w *watchGrpcStream) run() {
|
func (w *watchGRPCStream) run() {
|
||||||
var wc pb.Watch_WatchClient
|
var wc pb.Watch_WatchClient
|
||||||
var closeErr error
|
var closeErr error
|
||||||
|
|
||||||
@ -699,7 +699,7 @@ func (w *watchGrpcStream) run() {
|
|||||||
|
|
||||||
// nextResume chooses the next resuming to register with the grpc stream. Abandoned
|
// nextResume chooses the next resuming to register with the grpc stream. Abandoned
|
||||||
// streams are marked as nil in the queue since the head must wait for its inflight registration.
|
// streams are marked as nil in the queue since the head must wait for its inflight registration.
|
||||||
func (w *watchGrpcStream) nextResume() *watcherStream {
|
func (w *watchGRPCStream) nextResume() *watcherStream {
|
||||||
for len(w.resuming) != 0 {
|
for len(w.resuming) != 0 {
|
||||||
if w.resuming[0] != nil {
|
if w.resuming[0] != nil {
|
||||||
return w.resuming[0]
|
return w.resuming[0]
|
||||||
@ -710,7 +710,7 @@ func (w *watchGrpcStream) nextResume() *watcherStream {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// dispatchEvent sends a WatchResponse to the appropriate watcher stream
|
// dispatchEvent sends a WatchResponse to the appropriate watcher stream
|
||||||
func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
|
func (w *watchGRPCStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
|
||||||
events := make([]*Event, len(pbresp.Events))
|
events := make([]*Event, len(pbresp.Events))
|
||||||
for i, ev := range pbresp.Events {
|
for i, ev := range pbresp.Events {
|
||||||
events[i] = (*Event)(ev)
|
events[i] = (*Event)(ev)
|
||||||
@ -736,7 +736,7 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// broadcastResponse send a watch response to all watch substreams.
|
// broadcastResponse send a watch response to all watch substreams.
|
||||||
func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool {
|
func (w *watchGRPCStream) broadcastResponse(wr *WatchResponse) bool {
|
||||||
for _, ws := range w.substreams {
|
for _, ws := range w.substreams {
|
||||||
select {
|
select {
|
||||||
case ws.recvc <- wr:
|
case ws.recvc <- wr:
|
||||||
@ -747,8 +747,8 @@ func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// unicastResponse sends a watch response to a specific watch substream.
|
// unicastResponse sends a watch response to a specific watch substream.
|
||||||
func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {
|
func (w *watchGRPCStream) unicastResponse(wr *WatchResponse, watchID int64) bool {
|
||||||
ws, ok := w.substreams[watchId]
|
ws, ok := w.substreams[watchID]
|
||||||
if !ok {
|
if !ok {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -761,7 +761,7 @@ func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool
|
|||||||
}
|
}
|
||||||
|
|
||||||
// serveWatchClient forwards messages from the grpc stream to run()
|
// serveWatchClient forwards messages from the grpc stream to run()
|
||||||
func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
|
func (w *watchGRPCStream) serveWatchClient(wc pb.Watch_WatchClient) {
|
||||||
for {
|
for {
|
||||||
resp, err := wc.Recv()
|
resp, err := wc.Recv()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -780,7 +780,7 @@ func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// serveSubstream forwards watch responses from run() to the subscriber
|
// serveSubstream forwards watch responses from run() to the subscriber
|
||||||
func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
|
func (w *watchGRPCStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
|
||||||
if ws.closing {
|
if ws.closing {
|
||||||
panic("created substream goroutine but substream is closing")
|
panic("created substream goroutine but substream is closing")
|
||||||
}
|
}
|
||||||
@ -877,7 +877,7 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{
|
|||||||
// lazily send cancel message if events on missing id
|
// lazily send cancel message if events on missing id
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
|
func (w *watchGRPCStream) newWatchClient() (pb.Watch_WatchClient, error) {
|
||||||
// mark all substreams as resuming
|
// mark all substreams as resuming
|
||||||
close(w.resumec)
|
close(w.resumec)
|
||||||
w.resumec = make(chan struct{})
|
w.resumec = make(chan struct{})
|
||||||
@ -923,7 +923,7 @@ func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
|
|||||||
return wc, nil
|
return wc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan struct{} {
|
func (w *watchGRPCStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan struct{} {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(len(w.resuming))
|
wg.Add(len(w.resuming))
|
||||||
donec := make(chan struct{})
|
donec := make(chan struct{})
|
||||||
@ -960,7 +960,7 @@ func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan str
|
|||||||
}
|
}
|
||||||
|
|
||||||
// joinSubstreams waits for all substream goroutines to complete.
|
// joinSubstreams waits for all substream goroutines to complete.
|
||||||
func (w *watchGrpcStream) joinSubstreams() {
|
func (w *watchGRPCStream) joinSubstreams() {
|
||||||
for _, ws := range w.substreams {
|
for _, ws := range w.substreams {
|
||||||
<-ws.donec
|
<-ws.donec
|
||||||
}
|
}
|
||||||
@ -973,7 +973,7 @@ func (w *watchGrpcStream) joinSubstreams() {
|
|||||||
|
|
||||||
var maxBackoff = 100 * time.Millisecond
|
var maxBackoff = 100 * time.Millisecond
|
||||||
|
|
||||||
func (w *watchGrpcStream) backoffIfUnavailable(backoff time.Duration, err error) time.Duration {
|
func (w *watchGRPCStream) backoffIfUnavailable(backoff time.Duration, err error) time.Duration {
|
||||||
if isUnavailableErr(w.ctx, err) {
|
if isUnavailableErr(w.ctx, err) {
|
||||||
// retry, but backoff
|
// retry, but backoff
|
||||||
if backoff < maxBackoff {
|
if backoff < maxBackoff {
|
||||||
@ -991,7 +991,7 @@ func (w *watchGrpcStream) backoffIfUnavailable(backoff time.Duration, err error)
|
|||||||
// openWatchClient retries opening a watch client until success or halt.
|
// openWatchClient retries opening a watch client until success or halt.
|
||||||
// manually retry in case "ws==nil && err==nil"
|
// manually retry in case "ws==nil && err==nil"
|
||||||
// TODO: remove FailFast=false
|
// TODO: remove FailFast=false
|
||||||
func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
|
func (w *watchGRPCStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
|
||||||
backoff := time.Millisecond
|
backoff := time.Millisecond
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -25,14 +25,14 @@ import (
|
|||||||
"go.etcd.io/etcd/client/v3/concurrency"
|
"go.etcd.io/etcd/client/v3/concurrency"
|
||||||
)
|
)
|
||||||
|
|
||||||
func mockElection_Campaign() {
|
func mockElectionCampaign() {
|
||||||
fmt.Println("completed first election with e2")
|
fmt.Println("completed first election with e2")
|
||||||
fmt.Println("completed second election with e1")
|
fmt.Println("completed second election with e1")
|
||||||
}
|
}
|
||||||
|
|
||||||
func ExampleElection_Campaign() {
|
func ExampleElection_Campaign() {
|
||||||
forUnitTestsRunInMockedContext(
|
forUnitTestsRunInMockedContext(
|
||||||
mockElection_Campaign,
|
mockElectionCampaign,
|
||||||
func() {
|
func() {
|
||||||
cli, err := clientv3.New(clientv3.Config{Endpoints: exampleEndpoints()})
|
cli, err := clientv3.New(clientv3.Config{Endpoints: exampleEndpoints()})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -23,7 +23,7 @@ import (
|
|||||||
"go.etcd.io/etcd/client/v3/concurrency"
|
"go.etcd.io/etcd/client/v3/concurrency"
|
||||||
)
|
)
|
||||||
|
|
||||||
func mockMutex_TryLock() {
|
func mockMutexTryLock() {
|
||||||
fmt.Println("acquired lock for s1")
|
fmt.Println("acquired lock for s1")
|
||||||
fmt.Println("cannot acquire lock for s2, as already locked in another session")
|
fmt.Println("cannot acquire lock for s2, as already locked in another session")
|
||||||
fmt.Println("released lock for s1")
|
fmt.Println("released lock for s1")
|
||||||
@ -32,7 +32,7 @@ func mockMutex_TryLock() {
|
|||||||
|
|
||||||
func ExampleMutex_TryLock() {
|
func ExampleMutex_TryLock() {
|
||||||
forUnitTestsRunInMockedContext(
|
forUnitTestsRunInMockedContext(
|
||||||
mockMutex_TryLock,
|
mockMutexTryLock,
|
||||||
func() {
|
func() {
|
||||||
cli, err := clientv3.New(clientv3.Config{Endpoints: exampleEndpoints()})
|
cli, err := clientv3.New(clientv3.Config{Endpoints: exampleEndpoints()})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -85,7 +85,7 @@ func ExampleMutex_TryLock() {
|
|||||||
// acquired lock for s2
|
// acquired lock for s2
|
||||||
}
|
}
|
||||||
|
|
||||||
func mockMutex_Lock() {
|
func mockMutexLock() {
|
||||||
fmt.Println("acquired lock for s1")
|
fmt.Println("acquired lock for s1")
|
||||||
fmt.Println("released lock for s1")
|
fmt.Println("released lock for s1")
|
||||||
fmt.Println("acquired lock for s2")
|
fmt.Println("acquired lock for s2")
|
||||||
@ -93,7 +93,7 @@ func mockMutex_Lock() {
|
|||||||
|
|
||||||
func ExampleMutex_Lock() {
|
func ExampleMutex_Lock() {
|
||||||
forUnitTestsRunInMockedContext(
|
forUnitTestsRunInMockedContext(
|
||||||
mockMutex_Lock,
|
mockMutexLock,
|
||||||
func() {
|
func() {
|
||||||
cli, err := clientv3.New(clientv3.Config{Endpoints: exampleEndpoints()})
|
cli, err := clientv3.New(clientv3.Config{Endpoints: exampleEndpoints()})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -25,7 +25,7 @@ import (
|
|||||||
"go.etcd.io/etcd/client/v3/concurrency"
|
"go.etcd.io/etcd/client/v3/concurrency"
|
||||||
)
|
)
|
||||||
|
|
||||||
func mockSTM_apply() {
|
func mockSTMApply() {
|
||||||
fmt.Println("account sum is 500")
|
fmt.Println("account sum is 500")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -33,7 +33,7 @@ func mockSTM_apply() {
|
|||||||
// transfer between balances.
|
// transfer between balances.
|
||||||
func ExampleSTM_apply() {
|
func ExampleSTM_apply() {
|
||||||
forUnitTestsRunInMockedContext(
|
forUnitTestsRunInMockedContext(
|
||||||
mockSTM_apply,
|
mockSTMApply,
|
||||||
func() {
|
func() {
|
||||||
cli, err := clientv3.New(clientv3.Config{Endpoints: exampleEndpoints()})
|
cli, err := clientv3.New(clientv3.Config{Endpoints: exampleEndpoints()})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user