mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
client: address golangci var-naming issues
Signed-off-by: Ivan Valdes <ivan@vald.es>
This commit is contained in:
parent
ddf54715bf
commit
578b784dcd
@ -17,19 +17,23 @@ package logutil
|
||||
import "fmt"
|
||||
|
||||
const (
|
||||
JsonLogFormat = "json"
|
||||
JSONLogFormat = "json"
|
||||
ConsoleLogFormat = "console"
|
||||
//revive:disable:var-naming
|
||||
// Deprecated: Please use JSONLogFormat.
|
||||
JsonLogFormat = JSONLogFormat
|
||||
//revive:enable:var-naming
|
||||
)
|
||||
|
||||
var DefaultLogFormat = JsonLogFormat
|
||||
var DefaultLogFormat = JSONLogFormat
|
||||
|
||||
// ConvertToZapFormat converts and validated log format string.
|
||||
func ConvertToZapFormat(format string) (string, error) {
|
||||
switch format {
|
||||
case ConsoleLogFormat:
|
||||
return ConsoleLogFormat, nil
|
||||
case JsonLogFormat:
|
||||
return JsonLogFormat, nil
|
||||
case JSONLogFormat:
|
||||
return JSONLogFormat, nil
|
||||
case "":
|
||||
return DefaultLogFormat, nil
|
||||
default:
|
||||
|
@ -24,9 +24,9 @@ func TestLogFormat(t *testing.T) {
|
||||
want string
|
||||
errExpected bool
|
||||
}{
|
||||
{"json", JsonLogFormat, false},
|
||||
{"json", JSONLogFormat, false},
|
||||
{"console", ConsoleLogFormat, false},
|
||||
{"", JsonLogFormat, false},
|
||||
{"", JSONLogFormat, false},
|
||||
{"konsole", "", true},
|
||||
}
|
||||
|
||||
|
@ -26,13 +26,13 @@ func TestGetCipherSuite_not_existing(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func CipherSuiteExpectedToExist(tb testing.TB, cipher string, expectedId uint16) {
|
||||
func CipherSuiteExpectedToExist(tb testing.TB, cipher string, expectedID uint16) {
|
||||
vid, ok := GetCipherSuite(cipher)
|
||||
if !ok {
|
||||
tb.Errorf("Expected %v cipher to exist", cipher)
|
||||
}
|
||||
if vid != expectedId {
|
||||
tb.Errorf("For %v expected=%v found=%v", cipher, expectedId, vid)
|
||||
if vid != expectedID {
|
||||
tb.Errorf("For %v expected=%v found=%v", cipher, expectedID, vid)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -20,54 +20,54 @@ import (
|
||||
"strings"
|
||||
)
|
||||
|
||||
const ENV_VERIFY = "ETCD_VERIFY"
|
||||
const envVerify = "ETCD_VERIFY"
|
||||
|
||||
type VerificationType string
|
||||
|
||||
const (
|
||||
ENV_VERIFY_VALUE_ALL VerificationType = "all"
|
||||
ENV_VERIFY_VALUE_ASSERT VerificationType = "assert"
|
||||
envVerifyValueAll VerificationType = "all"
|
||||
envVerifyValueAssert VerificationType = "assert"
|
||||
)
|
||||
|
||||
func getEnvVerify() string {
|
||||
return strings.ToLower(os.Getenv(ENV_VERIFY))
|
||||
return strings.ToLower(os.Getenv(envVerify))
|
||||
}
|
||||
|
||||
func IsVerificationEnabled(verification VerificationType) bool {
|
||||
env := getEnvVerify()
|
||||
return env == string(ENV_VERIFY_VALUE_ALL) || env == strings.ToLower(string(verification))
|
||||
return env == string(envVerifyValueAll) || env == strings.ToLower(string(verification))
|
||||
}
|
||||
|
||||
// EnableVerifications sets `ENV_VERIFY` and returns a function that
|
||||
// EnableVerifications sets `envVerify` and returns a function that
|
||||
// can be used to bring the original settings.
|
||||
func EnableVerifications(verification VerificationType) func() {
|
||||
previousEnv := getEnvVerify()
|
||||
os.Setenv(ENV_VERIFY, string(verification))
|
||||
os.Setenv(envVerify, string(verification))
|
||||
return func() {
|
||||
os.Setenv(ENV_VERIFY, previousEnv)
|
||||
os.Setenv(envVerify, previousEnv)
|
||||
}
|
||||
}
|
||||
|
||||
// EnableAllVerifications enables verification and returns a function
|
||||
// that can be used to bring the original settings.
|
||||
func EnableAllVerifications() func() {
|
||||
return EnableVerifications(ENV_VERIFY_VALUE_ALL)
|
||||
return EnableVerifications(envVerifyValueAll)
|
||||
}
|
||||
|
||||
// DisableVerifications unsets `ENV_VERIFY` and returns a function that
|
||||
// DisableVerifications unsets `envVerify` and returns a function that
|
||||
// can be used to bring the original settings.
|
||||
func DisableVerifications() func() {
|
||||
previousEnv := getEnvVerify()
|
||||
os.Unsetenv(ENV_VERIFY)
|
||||
os.Unsetenv(envVerify)
|
||||
return func() {
|
||||
os.Setenv(ENV_VERIFY, previousEnv)
|
||||
os.Setenv(envVerify, previousEnv)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify performs verification if the assertions are enabled.
|
||||
// In the default setup running in tests and skipped in the production code.
|
||||
func Verify(f func()) {
|
||||
if IsVerificationEnabled(ENV_VERIFY_VALUE_ASSERT) {
|
||||
if IsVerificationEnabled(envVerifyValueAssert) {
|
||||
f()
|
||||
}
|
||||
}
|
||||
|
@ -345,11 +345,11 @@ func authority(endpoint string) string {
|
||||
func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials {
|
||||
r := endpoint.RequiresCredentials(ep)
|
||||
switch r {
|
||||
case endpoint.CREDS_DROP:
|
||||
case endpoint.CredsDrop:
|
||||
return nil
|
||||
case endpoint.CREDS_OPTIONAL:
|
||||
case endpoint.CredsOptional:
|
||||
return c.creds
|
||||
case endpoint.CREDS_REQUIRE:
|
||||
case endpoint.CredsRequire:
|
||||
if c.creds != nil {
|
||||
return c.creds
|
||||
}
|
||||
|
@ -25,12 +25,12 @@ import (
|
||||
type CredsRequirement int
|
||||
|
||||
const (
|
||||
// CREDS_REQUIRE - Credentials/certificate required for thi type of connection.
|
||||
CREDS_REQUIRE CredsRequirement = iota
|
||||
// CREDS_DROP - Credentials/certificate not needed and should get ignored.
|
||||
CREDS_DROP
|
||||
// CREDS_OPTIONAL - Credentials/certificate might be used if supplied
|
||||
CREDS_OPTIONAL
|
||||
// CredsRequire - Credentials/certificate required for thi type of connection.
|
||||
CredsRequire CredsRequirement = iota
|
||||
// CredsDrop - Credentials/certificate not needed and should get ignored.
|
||||
CredsDrop
|
||||
// CredsOptional - Credentials/certificate might be used if supplied
|
||||
CredsOptional
|
||||
)
|
||||
|
||||
func extractHostFromHostPort(ep string) string {
|
||||
@ -54,20 +54,20 @@ func mustSplit2(s, sep string) (string, string) {
|
||||
func schemeToCredsRequirement(schema string) CredsRequirement {
|
||||
switch schema {
|
||||
case "https", "unixs":
|
||||
return CREDS_REQUIRE
|
||||
return CredsRequire
|
||||
case "http":
|
||||
return CREDS_DROP
|
||||
return CredsDrop
|
||||
case "unix":
|
||||
// Preserving previous behavior from:
|
||||
// https://github.com/etcd-io/etcd/blob/dae29bb719dd69dc119146fc297a0628fcc1ccf8/client/v3/client.go#L212
|
||||
// that likely was a bug due to missing 'fallthrough'.
|
||||
// At the same time it seems legit to let the users decide whether they
|
||||
// want credential control or not (and 'unixs' schema is not a standard thing).
|
||||
return CREDS_OPTIONAL
|
||||
return CredsOptional
|
||||
case "":
|
||||
return CREDS_OPTIONAL
|
||||
return CredsOptional
|
||||
default:
|
||||
return CREDS_OPTIONAL
|
||||
return CredsOptional
|
||||
}
|
||||
}
|
||||
|
||||
@ -106,7 +106,7 @@ func translateEndpoint(ep string) (addr string, serverName string, requireCreds
|
||||
if strings.Contains(ep, "://") {
|
||||
url, err := url.Parse(ep)
|
||||
if err != nil {
|
||||
return ep, ep, CREDS_OPTIONAL
|
||||
return ep, ep, CredsOptional
|
||||
}
|
||||
if url.Scheme == "http" || url.Scheme == "https" {
|
||||
return url.Host, url.Host, schemeToCredsRequirement(url.Scheme)
|
||||
@ -114,7 +114,7 @@ func translateEndpoint(ep string) (addr string, serverName string, requireCreds
|
||||
return ep, url.Host, schemeToCredsRequirement(url.Scheme)
|
||||
}
|
||||
// Handles plain addresses like 10.0.0.44:437.
|
||||
return ep, ep, CREDS_OPTIONAL
|
||||
return ep, ep, CredsOptional
|
||||
}
|
||||
|
||||
// RequiresCredentials returns whether given endpoint requires
|
||||
|
@ -25,38 +25,38 @@ func Test_interpret(t *testing.T) {
|
||||
wantServerName string
|
||||
wantRequiresCreds CredsRequirement
|
||||
}{
|
||||
{"127.0.0.1", "127.0.0.1", "127.0.0.1", CREDS_OPTIONAL},
|
||||
{"localhost", "localhost", "localhost", CREDS_OPTIONAL},
|
||||
{"localhost:8080", "localhost:8080", "localhost:8080", CREDS_OPTIONAL},
|
||||
{"127.0.0.1", "127.0.0.1", "127.0.0.1", CredsOptional},
|
||||
{"localhost", "localhost", "localhost", CredsOptional},
|
||||
{"localhost:8080", "localhost:8080", "localhost:8080", CredsOptional},
|
||||
|
||||
{"unix:127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CREDS_OPTIONAL},
|
||||
{"unix:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1:8080", CREDS_OPTIONAL},
|
||||
{"unix:127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CredsOptional},
|
||||
{"unix:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1:8080", CredsOptional},
|
||||
|
||||
{"unix://127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CREDS_OPTIONAL},
|
||||
{"unix://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1:8080", CREDS_OPTIONAL},
|
||||
{"unix://127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CredsOptional},
|
||||
{"unix://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1:8080", CredsOptional},
|
||||
|
||||
{"unixs:127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CREDS_REQUIRE},
|
||||
{"unixs:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1:8080", CREDS_REQUIRE},
|
||||
{"unixs://127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CREDS_REQUIRE},
|
||||
{"unixs://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1:8080", CREDS_REQUIRE},
|
||||
{"unixs:127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CredsRequire},
|
||||
{"unixs:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1:8080", CredsRequire},
|
||||
{"unixs://127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CredsRequire},
|
||||
{"unixs://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1:8080", CredsRequire},
|
||||
|
||||
{"http://127.0.0.1", "127.0.0.1", "127.0.0.1", CREDS_DROP},
|
||||
{"http://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1:8080", CREDS_DROP},
|
||||
{"https://127.0.0.1", "127.0.0.1", "127.0.0.1", CREDS_REQUIRE},
|
||||
{"https://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1:8080", CREDS_REQUIRE},
|
||||
{"https://localhost:20000", "localhost:20000", "localhost:20000", CREDS_REQUIRE},
|
||||
{"http://127.0.0.1", "127.0.0.1", "127.0.0.1", CredsDrop},
|
||||
{"http://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1:8080", CredsDrop},
|
||||
{"https://127.0.0.1", "127.0.0.1", "127.0.0.1", CredsRequire},
|
||||
{"https://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1:8080", CredsRequire},
|
||||
{"https://localhost:20000", "localhost:20000", "localhost:20000", CredsRequire},
|
||||
|
||||
{"unix:///tmp/abc", "unix:///tmp/abc", "abc", CREDS_OPTIONAL},
|
||||
{"unixs:///tmp/abc", "unix:///tmp/abc", "abc", CREDS_REQUIRE},
|
||||
{"unix:///tmp/abc:1234", "unix:///tmp/abc:1234", "abc:1234", CREDS_OPTIONAL},
|
||||
{"unixs:///tmp/abc:1234", "unix:///tmp/abc:1234", "abc:1234", CREDS_REQUIRE},
|
||||
{"etcd.io", "etcd.io", "etcd.io", CREDS_OPTIONAL},
|
||||
{"http://etcd.io/abc", "etcd.io", "etcd.io", CREDS_DROP},
|
||||
{"dns://something-other", "dns://something-other", "something-other", CREDS_OPTIONAL},
|
||||
{"unix:///tmp/abc", "unix:///tmp/abc", "abc", CredsOptional},
|
||||
{"unixs:///tmp/abc", "unix:///tmp/abc", "abc", CredsRequire},
|
||||
{"unix:///tmp/abc:1234", "unix:///tmp/abc:1234", "abc:1234", CredsOptional},
|
||||
{"unixs:///tmp/abc:1234", "unix:///tmp/abc:1234", "abc:1234", CredsRequire},
|
||||
{"etcd.io", "etcd.io", "etcd.io", CredsOptional},
|
||||
{"http://etcd.io/abc", "etcd.io", "etcd.io", CredsDrop},
|
||||
{"dns://something-other", "dns://something-other", "something-other", CredsOptional},
|
||||
|
||||
{"http://[2001:db8:1f70::999:de8:7648:6e8]:100/", "[2001:db8:1f70::999:de8:7648:6e8]:100", "[2001:db8:1f70::999:de8:7648:6e8]:100", CREDS_DROP},
|
||||
{"[2001:db8:1f70::999:de8:7648:6e8]:100", "[2001:db8:1f70::999:de8:7648:6e8]:100", "[2001:db8:1f70::999:de8:7648:6e8]:100", CREDS_OPTIONAL},
|
||||
{"unix:unexpected-file_name#123$456", "unix:unexpected-file_name#123$456", "unexpected-file_name#123$456", CREDS_OPTIONAL},
|
||||
{"http://[2001:db8:1f70::999:de8:7648:6e8]:100/", "[2001:db8:1f70::999:de8:7648:6e8]:100", "[2001:db8:1f70::999:de8:7648:6e8]:100", CredsDrop},
|
||||
{"[2001:db8:1f70::999:de8:7648:6e8]:100", "[2001:db8:1f70::999:de8:7648:6e8]:100", "[2001:db8:1f70::999:de8:7648:6e8]:100", CredsOptional},
|
||||
{"unix:unexpected-file_name#123$456", "unix:unexpected-file_name#123$456", "unexpected-file_name#123$456", CredsOptional},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run("Interpret_"+tt.endpoint, func(t *testing.T) {
|
||||
|
@ -32,7 +32,7 @@ type MockServer struct {
|
||||
ln net.Listener
|
||||
Network string
|
||||
Address string
|
||||
GrpcServer *grpc.Server
|
||||
GRPCServer *grpc.Server
|
||||
}
|
||||
|
||||
func (ms *MockServer) ResolverAddress() resolver.Address {
|
||||
@ -63,7 +63,7 @@ func StartMockServers(count int) (ms *MockServers, err error) {
|
||||
func StartMockServersOnNetwork(count int, network string) (ms *MockServers, err error) {
|
||||
switch network {
|
||||
case "tcp":
|
||||
return startMockServersTcp(count)
|
||||
return startMockServersTCP(count)
|
||||
case "unix":
|
||||
return startMockServersUnix(count)
|
||||
default:
|
||||
@ -71,7 +71,7 @@ func StartMockServersOnNetwork(count int, network string) (ms *MockServers, err
|
||||
}
|
||||
}
|
||||
|
||||
func startMockServersTcp(count int) (ms *MockServers, err error) {
|
||||
func startMockServersTCP(count int) (ms *MockServers, err error) {
|
||||
addrs := make([]string, 0, count)
|
||||
for i := 0; i < count; i++ {
|
||||
addrs = append(addrs, "localhost:0")
|
||||
@ -133,12 +133,12 @@ func (ms *MockServers) StartAt(idx int) (err error) {
|
||||
svr := grpc.NewServer()
|
||||
pb.RegisterKVServer(svr, &mockKVServer{})
|
||||
pb.RegisterLeaseServer(svr, &mockLeaseServer{})
|
||||
ms.Servers[idx].GrpcServer = svr
|
||||
ms.Servers[idx].GRPCServer = svr
|
||||
|
||||
ms.wg.Add(1)
|
||||
go func(svr *grpc.Server, l net.Listener) {
|
||||
svr.Serve(l)
|
||||
}(ms.Servers[idx].GrpcServer, ms.Servers[idx].ln)
|
||||
}(ms.Servers[idx].GRPCServer, ms.Servers[idx].ln)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -151,8 +151,8 @@ func (ms *MockServers) StopAt(idx int) {
|
||||
return
|
||||
}
|
||||
|
||||
ms.Servers[idx].GrpcServer.Stop()
|
||||
ms.Servers[idx].GrpcServer = nil
|
||||
ms.Servers[idx].GRPCServer.Stop()
|
||||
ms.Servers[idx].GRPCServer = nil
|
||||
ms.Servers[idx].ln = nil
|
||||
ms.wg.Done()
|
||||
}
|
||||
|
@ -311,7 +311,7 @@ func waitRetryBackoff(ctx context.Context, attempt uint, callOpts *options) erro
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return contextErrToGrpcErr(ctx.Err())
|
||||
return contextErrToGRPCErr(ctx.Err())
|
||||
case <-timer.C:
|
||||
}
|
||||
}
|
||||
@ -349,7 +349,7 @@ func isContextError(err error) bool {
|
||||
return status.Code(err) == codes.DeadlineExceeded || status.Code(err) == codes.Canceled
|
||||
}
|
||||
|
||||
func contextErrToGrpcErr(err error) error {
|
||||
func contextErrToGRPCErr(err error) error {
|
||||
switch err {
|
||||
case context.DeadlineExceeded:
|
||||
return status.Errorf(codes.DeadlineExceeded, err.Error())
|
||||
|
@ -149,12 +149,12 @@ type watcher struct {
|
||||
mu sync.Mutex
|
||||
|
||||
// streams holds all the active grpc streams keyed by ctx value.
|
||||
streams map[string]*watchGrpcStream
|
||||
streams map[string]*watchGRPCStream
|
||||
lg *zap.Logger
|
||||
}
|
||||
|
||||
// watchGrpcStream tracks all watch resources attached to a single grpc stream.
|
||||
type watchGrpcStream struct {
|
||||
// watchGRPCStream tracks all watch resources attached to a single grpc stream.
|
||||
type watchGRPCStream struct {
|
||||
owner *watcher
|
||||
remote pb.WatchClient
|
||||
callOpts []grpc.CallOption
|
||||
@ -251,7 +251,7 @@ func NewWatcher(c *Client) Watcher {
|
||||
func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
|
||||
w := &watcher{
|
||||
remote: wc,
|
||||
streams: make(map[string]*watchGrpcStream),
|
||||
streams: make(map[string]*watchGRPCStream),
|
||||
}
|
||||
if c != nil {
|
||||
w.callOpts = c.callOpts
|
||||
@ -271,9 +271,9 @@ func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false }
|
||||
func (vc *valCtx) Done() <-chan struct{} { return valCtxCh }
|
||||
func (vc *valCtx) Err() error { return nil }
|
||||
|
||||
func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
|
||||
func (w *watcher) newWatcherGRPCStream(inctx context.Context) *watchGRPCStream {
|
||||
ctx, cancel := context.WithCancel(&valCtx{inctx})
|
||||
wgs := &watchGrpcStream{
|
||||
wgs := &watchGRPCStream{
|
||||
owner: w,
|
||||
remote: w.remote,
|
||||
callOpts: w.callOpts,
|
||||
@ -334,7 +334,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
|
||||
}
|
||||
wgs := w.streams[ctxKey]
|
||||
if wgs == nil {
|
||||
wgs = w.newWatcherGrpcStream(ctx)
|
||||
wgs = w.newWatcherGRPCStream(ctx)
|
||||
w.streams[ctxKey] = wgs
|
||||
}
|
||||
donec := wgs.donec
|
||||
@ -412,7 +412,7 @@ func (w *watcher) RequestProgress(ctx context.Context) (err error) {
|
||||
}
|
||||
wgs := w.streams[ctxKey]
|
||||
if wgs == nil {
|
||||
wgs = w.newWatcherGrpcStream(ctx)
|
||||
wgs = w.newWatcherGRPCStream(ctx)
|
||||
w.streams[ctxKey] = wgs
|
||||
}
|
||||
donec := wgs.donec
|
||||
@ -435,7 +435,7 @@ func (w *watcher) RequestProgress(ctx context.Context) (err error) {
|
||||
}
|
||||
}
|
||||
|
||||
func (w *watchGrpcStream) close() (err error) {
|
||||
func (w *watchGRPCStream) close() (err error) {
|
||||
w.cancel()
|
||||
<-w.donec
|
||||
select {
|
||||
@ -445,7 +445,7 @@ func (w *watchGrpcStream) close() (err error) {
|
||||
return toErr(w.ctx, err)
|
||||
}
|
||||
|
||||
func (w *watcher) closeStream(wgs *watchGrpcStream) {
|
||||
func (w *watcher) closeStream(wgs *watchGRPCStream) {
|
||||
w.mu.Lock()
|
||||
close(wgs.donec)
|
||||
wgs.cancel()
|
||||
@ -455,7 +455,7 @@ func (w *watcher) closeStream(wgs *watchGrpcStream) {
|
||||
w.mu.Unlock()
|
||||
}
|
||||
|
||||
func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
|
||||
func (w *watchGRPCStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
|
||||
// check watch ID for backward compatibility (<= v3.3)
|
||||
if resp.WatchId == InvalidWatchID || (resp.Canceled && resp.CancelReason != "") {
|
||||
w.closeErr = v3rpc.Error(errors.New(resp.CancelReason))
|
||||
@ -467,7 +467,7 @@ func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream
|
||||
w.substreams[ws.id] = ws
|
||||
}
|
||||
|
||||
func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) {
|
||||
func (w *watchGRPCStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) {
|
||||
select {
|
||||
case ws.outc <- *resp:
|
||||
case <-ws.initReq.ctx.Done():
|
||||
@ -476,7 +476,7 @@ func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchRespo
|
||||
close(ws.outc)
|
||||
}
|
||||
|
||||
func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
|
||||
func (w *watchGRPCStream) closeSubstream(ws *watcherStream) {
|
||||
// send channel response in case stream was never established
|
||||
select {
|
||||
case ws.initReq.retc <- ws.outc:
|
||||
@ -501,7 +501,7 @@ func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
|
||||
}
|
||||
|
||||
// run is the root of the goroutines for managing a watcher client
|
||||
func (w *watchGrpcStream) run() {
|
||||
func (w *watchGRPCStream) run() {
|
||||
var wc pb.Watch_WatchClient
|
||||
var closeErr error
|
||||
|
||||
@ -699,7 +699,7 @@ func (w *watchGrpcStream) run() {
|
||||
|
||||
// nextResume chooses the next resuming to register with the grpc stream. Abandoned
|
||||
// streams are marked as nil in the queue since the head must wait for its inflight registration.
|
||||
func (w *watchGrpcStream) nextResume() *watcherStream {
|
||||
func (w *watchGRPCStream) nextResume() *watcherStream {
|
||||
for len(w.resuming) != 0 {
|
||||
if w.resuming[0] != nil {
|
||||
return w.resuming[0]
|
||||
@ -710,7 +710,7 @@ func (w *watchGrpcStream) nextResume() *watcherStream {
|
||||
}
|
||||
|
||||
// dispatchEvent sends a WatchResponse to the appropriate watcher stream
|
||||
func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
|
||||
func (w *watchGRPCStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
|
||||
events := make([]*Event, len(pbresp.Events))
|
||||
for i, ev := range pbresp.Events {
|
||||
events[i] = (*Event)(ev)
|
||||
@ -736,7 +736,7 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
|
||||
}
|
||||
|
||||
// broadcastResponse send a watch response to all watch substreams.
|
||||
func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool {
|
||||
func (w *watchGRPCStream) broadcastResponse(wr *WatchResponse) bool {
|
||||
for _, ws := range w.substreams {
|
||||
select {
|
||||
case ws.recvc <- wr:
|
||||
@ -747,8 +747,8 @@ func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool {
|
||||
}
|
||||
|
||||
// unicastResponse sends a watch response to a specific watch substream.
|
||||
func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {
|
||||
ws, ok := w.substreams[watchId]
|
||||
func (w *watchGRPCStream) unicastResponse(wr *WatchResponse, watchID int64) bool {
|
||||
ws, ok := w.substreams[watchID]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
@ -761,7 +761,7 @@ func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool
|
||||
}
|
||||
|
||||
// serveWatchClient forwards messages from the grpc stream to run()
|
||||
func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
|
||||
func (w *watchGRPCStream) serveWatchClient(wc pb.Watch_WatchClient) {
|
||||
for {
|
||||
resp, err := wc.Recv()
|
||||
if err != nil {
|
||||
@ -780,7 +780,7 @@ func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
|
||||
}
|
||||
|
||||
// serveSubstream forwards watch responses from run() to the subscriber
|
||||
func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
|
||||
func (w *watchGRPCStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
|
||||
if ws.closing {
|
||||
panic("created substream goroutine but substream is closing")
|
||||
}
|
||||
@ -877,7 +877,7 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{
|
||||
// lazily send cancel message if events on missing id
|
||||
}
|
||||
|
||||
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
|
||||
func (w *watchGRPCStream) newWatchClient() (pb.Watch_WatchClient, error) {
|
||||
// mark all substreams as resuming
|
||||
close(w.resumec)
|
||||
w.resumec = make(chan struct{})
|
||||
@ -923,7 +923,7 @@ func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
|
||||
return wc, nil
|
||||
}
|
||||
|
||||
func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan struct{} {
|
||||
func (w *watchGRPCStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan struct{} {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(w.resuming))
|
||||
donec := make(chan struct{})
|
||||
@ -960,7 +960,7 @@ func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan str
|
||||
}
|
||||
|
||||
// joinSubstreams waits for all substream goroutines to complete.
|
||||
func (w *watchGrpcStream) joinSubstreams() {
|
||||
func (w *watchGRPCStream) joinSubstreams() {
|
||||
for _, ws := range w.substreams {
|
||||
<-ws.donec
|
||||
}
|
||||
@ -973,7 +973,7 @@ func (w *watchGrpcStream) joinSubstreams() {
|
||||
|
||||
var maxBackoff = 100 * time.Millisecond
|
||||
|
||||
func (w *watchGrpcStream) backoffIfUnavailable(backoff time.Duration, err error) time.Duration {
|
||||
func (w *watchGRPCStream) backoffIfUnavailable(backoff time.Duration, err error) time.Duration {
|
||||
if isUnavailableErr(w.ctx, err) {
|
||||
// retry, but backoff
|
||||
if backoff < maxBackoff {
|
||||
@ -991,7 +991,7 @@ func (w *watchGrpcStream) backoffIfUnavailable(backoff time.Duration, err error)
|
||||
// openWatchClient retries opening a watch client until success or halt.
|
||||
// manually retry in case "ws==nil && err==nil"
|
||||
// TODO: remove FailFast=false
|
||||
func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
|
||||
func (w *watchGRPCStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
|
||||
backoff := time.Millisecond
|
||||
for {
|
||||
select {
|
||||
|
@ -25,14 +25,14 @@ import (
|
||||
"go.etcd.io/etcd/client/v3/concurrency"
|
||||
)
|
||||
|
||||
func mockElection_Campaign() {
|
||||
func mockElectionCampaign() {
|
||||
fmt.Println("completed first election with e2")
|
||||
fmt.Println("completed second election with e1")
|
||||
}
|
||||
|
||||
func ExampleElection_Campaign() {
|
||||
forUnitTestsRunInMockedContext(
|
||||
mockElection_Campaign,
|
||||
mockElectionCampaign,
|
||||
func() {
|
||||
cli, err := clientv3.New(clientv3.Config{Endpoints: exampleEndpoints()})
|
||||
if err != nil {
|
||||
|
@ -23,7 +23,7 @@ import (
|
||||
"go.etcd.io/etcd/client/v3/concurrency"
|
||||
)
|
||||
|
||||
func mockMutex_TryLock() {
|
||||
func mockMutexTryLock() {
|
||||
fmt.Println("acquired lock for s1")
|
||||
fmt.Println("cannot acquire lock for s2, as already locked in another session")
|
||||
fmt.Println("released lock for s1")
|
||||
@ -32,7 +32,7 @@ func mockMutex_TryLock() {
|
||||
|
||||
func ExampleMutex_TryLock() {
|
||||
forUnitTestsRunInMockedContext(
|
||||
mockMutex_TryLock,
|
||||
mockMutexTryLock,
|
||||
func() {
|
||||
cli, err := clientv3.New(clientv3.Config{Endpoints: exampleEndpoints()})
|
||||
if err != nil {
|
||||
@ -85,7 +85,7 @@ func ExampleMutex_TryLock() {
|
||||
// acquired lock for s2
|
||||
}
|
||||
|
||||
func mockMutex_Lock() {
|
||||
func mockMutexLock() {
|
||||
fmt.Println("acquired lock for s1")
|
||||
fmt.Println("released lock for s1")
|
||||
fmt.Println("acquired lock for s2")
|
||||
@ -93,7 +93,7 @@ func mockMutex_Lock() {
|
||||
|
||||
func ExampleMutex_Lock() {
|
||||
forUnitTestsRunInMockedContext(
|
||||
mockMutex_Lock,
|
||||
mockMutexLock,
|
||||
func() {
|
||||
cli, err := clientv3.New(clientv3.Config{Endpoints: exampleEndpoints()})
|
||||
if err != nil {
|
||||
|
@ -25,7 +25,7 @@ import (
|
||||
"go.etcd.io/etcd/client/v3/concurrency"
|
||||
)
|
||||
|
||||
func mockSTM_apply() {
|
||||
func mockSTMApply() {
|
||||
fmt.Println("account sum is 500")
|
||||
}
|
||||
|
||||
@ -33,7 +33,7 @@ func mockSTM_apply() {
|
||||
// transfer between balances.
|
||||
func ExampleSTM_apply() {
|
||||
forUnitTestsRunInMockedContext(
|
||||
mockSTM_apply,
|
||||
mockSTMApply,
|
||||
func() {
|
||||
cli, err := clientv3.New(clientv3.Config{Endpoints: exampleEndpoints()})
|
||||
if err != nil {
|
||||
|
Loading…
x
Reference in New Issue
Block a user