diff --git a/clientv3/client.go b/clientv3/client.go index f6a54f632..cec5695b2 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -28,15 +28,12 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/credentials" - "google.golang.org/grpc/grpclog" ) var ( ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints") ) -type Logger grpclog.Logger - // Client provides and manages an etcd v3 client session. type Client struct { Cluster @@ -54,8 +51,6 @@ type Client struct { ctx context.Context cancel context.CancelFunc - - logger Logger } // EndpointDialer is a policy for choosing which endpoint to dial next @@ -190,13 +185,11 @@ func newClient(cfg *Config) (*Client, error) { client.Watcher = NewWatcher(client) client.Auth = NewAuth(client) client.Maintenance = &maintenance{c: client} - if cfg.Logger == nil { - client.logger = log.New(ioutil.Discard, "", 0) - // disable client side grpc by default - grpclog.SetLogger(log.New(ioutil.Discard, "", 0)) + if cfg.Logger != nil { + logger.Set(cfg.Logger) } else { - client.logger = cfg.Logger - grpclog.SetLogger(cfg.Logger) + // disable client side grpc by default + logger.Set(log.New(ioutil.Discard, "", 0)) } return client, nil diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index e6081ef66..c24249344 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -18,6 +18,7 @@ import ( "fmt" "reflect" "sort" + "sync/atomic" "testing" "time" @@ -379,17 +380,19 @@ func TestWatchWithProgressNotifyNoEvent(t *testing.T) { testWatchWithProgressNot func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) { defer testutil.AfterTest(t) + // accelerate report interval so test terminates quickly + oldpi := v3rpc.ProgressReportIntervalMilliseconds + // using atomics to avoid race warnings + atomic.StoreInt32(&v3rpc.ProgressReportIntervalMilliseconds, 3*1000) + pi := 3 * time.Second + defer func() { v3rpc.ProgressReportIntervalMilliseconds = oldpi }() + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) wc := clientv3.NewWatcher(clus.RandClient()) defer wc.Close() - testInterval := 3 * time.Second - pi := v3rpc.ProgressReportInterval - v3rpc.ProgressReportInterval = testInterval - defer func() { v3rpc.ProgressReportInterval = pi }() - opts := []clientv3.OpOption{clientv3.WithProgressNotify()} if watchOnPut { opts = append(opts, clientv3.WithPrefix()) diff --git a/clientv3/kv.go b/clientv3/kv.go index 9b7d31167..04e33688b 100644 --- a/clientv3/kv.go +++ b/clientv3/kv.go @@ -183,14 +183,18 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) { } func (kv *kv) switchRemote(prevErr error) error { + // Usually it's a bad idea to lock on network i/o but here it's OK + // since the link is down and new requests can't be processed anyway. + // Likewise, if connecting stalls, closing the Client can break the + // lock via context cancelation. + kv.mu.Lock() + defer kv.mu.Unlock() + newConn, err := kv.c.retryConnection(kv.conn, prevErr) if err != nil { return err } - kv.mu.Lock() - defer kv.mu.Unlock() - kv.conn = newConn kv.remote = pb.NewKVClient(kv.conn) return nil diff --git a/clientv3/logger.go b/clientv3/logger.go new file mode 100644 index 000000000..47a31ff05 --- /dev/null +++ b/clientv3/logger.go @@ -0,0 +1,64 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3 + +import ( + "log" + "os" + "sync" + + "google.golang.org/grpc/grpclog" +) + +type Logger grpclog.Logger + +var ( + logger settableLogger +) + +type settableLogger struct { + l grpclog.Logger + mu sync.RWMutex +} + +func init() { + // use go's standard logger by default like grpc + logger.mu.Lock() + logger.l = log.New(os.Stderr, "", log.LstdFlags) + grpclog.SetLogger(&logger) + logger.mu.Unlock() +} + +func (s *settableLogger) Set(l Logger) { + s.mu.Lock() + logger.l = l + s.mu.Unlock() +} + +func (s *settableLogger) Get() Logger { + s.mu.RLock() + l := logger.l + s.mu.RUnlock() + return l +} + +// implement the grpclog.Logger interface + +func (s *settableLogger) Fatal(args ...interface{}) { s.Get().Fatal(args...) } +func (s *settableLogger) Fatalf(format string, args ...interface{}) { s.Get().Fatalf(format, args...) } +func (s *settableLogger) Fatalln(args ...interface{}) { s.Get().Fatalln(args...) } +func (s *settableLogger) Print(args ...interface{}) { s.Get().Print(args...) } +func (s *settableLogger) Printf(format string, args ...interface{}) { s.Get().Printf(format, args...) } +func (s *settableLogger) Println(args ...interface{}) { s.Get().Println(args...) } diff --git a/clientv3/watch.go b/clientv3/watch.go index 17b1bc9d1..fc49dd17e 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -240,11 +240,11 @@ func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) { w.streams[ws.id] = ws w.mu.Unlock() - // send messages to subscriber - go w.serveStream(ws) - // pass back the subscriber channel for the watcher pendingReq.retc <- ret + + // send messages to subscriber + go w.serveStream(ws) } // closeStream closes the watcher resources and removes it @@ -436,11 +436,15 @@ func (w *watcher) serveStream(ws *watcherStream) { // TODO don't keep buffering if subscriber stops reading wrs = append(wrs, wr) case resumeRev := <-ws.resumec: + wrs = nil + resuming = true + if resumeRev == -1 { + // pause serving stream while resume gets set up + break + } if resumeRev != ws.lastRev { panic("unexpected resume revision") } - wrs = nil - resuming = true case <-w.donec: closing = true case <-ws.initReq.ctx.Done(): @@ -502,6 +506,9 @@ func (w *watcher) resumeWatchers(wc pb.Watch_WatchClient) error { w.mu.RUnlock() for _, ws := range streams { + // pause serveStream + ws.resumec <- -1 + // reconstruct watcher from initial request if ws.lastRev != 0 { ws.initReq.rev = ws.lastRev @@ -525,6 +532,7 @@ func (w *watcher) resumeWatchers(wc pb.Watch_WatchClient) error { w.streams[ws.id] = ws w.mu.Unlock() + // unpause serveStream ws.resumec <- ws.lastRev } return nil diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index f89be8fe5..d7f4ea842 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -16,6 +16,7 @@ package v3rpc import ( "io" + "sync" "time" "github.com/coreos/etcd/etcdserver" @@ -42,8 +43,9 @@ func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer { var ( // expose for testing purpose. External test can change this to a - // small value to finish fast. - ProgressReportInterval = 10 * time.Minute + // small value to finish fast. The type is int32 instead of time.Duration + // in order to placate the race detector by setting the value with atomic stores. + ProgressReportIntervalMilliseconds = int32(10 * 60 * 1000) // 10 minutes ) const ( @@ -71,6 +73,8 @@ type serverWatchStream struct { // progress tracks the watchID that stream might need to send // progress to. progress map[storage.WatchID]bool + // mu protects progress + mu sync.Mutex // closec indicates the stream is closed. closec chan struct{} @@ -144,7 +148,9 @@ func (sws *serverWatchStream) recvLoop() error { WatchId: id, Canceled: true, } + sws.mu.Lock() delete(sws.progress, storage.WatchID(id)) + sws.mu.Unlock() } } // TODO: do we need to return error back to client? @@ -160,7 +166,8 @@ func (sws *serverWatchStream) sendLoop() { // watch responses pending on a watch id creation message pending := make(map[storage.WatchID][]*pb.WatchResponse) - progressTicker := time.NewTicker(ProgressReportInterval) + interval := time.Duration(ProgressReportIntervalMilliseconds) * time.Millisecond + progressTicker := time.NewTicker(interval) defer progressTicker.Stop() for { @@ -198,9 +205,11 @@ func (sws *serverWatchStream) sendLoop() { return } + sws.mu.Lock() if _, ok := sws.progress[wresp.WatchID]; ok { sws.progress[wresp.WatchID] = false } + sws.mu.Unlock() case c, ok := <-sws.ctrlStream: if !ok { diff --git a/integration/v3_watch_test.go b/integration/v3_watch_test.go index e48a4f151..79da449ad 100644 --- a/integration/v3_watch_test.go +++ b/integration/v3_watch_test.go @@ -20,6 +20,7 @@ import ( "reflect" "sort" "sync" + "sync/atomic" "testing" "time" @@ -922,10 +923,12 @@ func waitResponse(wc pb.Watch_WatchClient, timeout time.Duration) (bool, *pb.Wat } func TestWatchWithProgressNotify(t *testing.T) { + // accelerate report interval so test terminates quickly + oldpi := v3rpc.ProgressReportIntervalMilliseconds + // using atomics to avoid race warnings + atomic.StoreInt32(&v3rpc.ProgressReportIntervalMilliseconds, 3*1000) testInterval := 3 * time.Second - pi := v3rpc.ProgressReportInterval - v3rpc.ProgressReportInterval = testInterval - defer func() { v3rpc.ProgressReportInterval = pi }() + defer func() { v3rpc.ProgressReportIntervalMilliseconds = oldpi }() defer testutil.AfterTest(t) clus := NewClusterV3(t, &ClusterConfig{Size: 3}) diff --git a/rafthttp/transport.go b/rafthttp/transport.go index f9ee78bdf..3c4ce3c19 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -165,10 +165,11 @@ func (t *Transport) Send(msgs []raftpb.Message) { to := types.ID(m.To) t.mu.RLock() - p, ok := t.peers[to] + p, pok := t.peers[to] + g, rok := t.remotes[to] t.mu.RUnlock() - if ok { + if pok { if m.Type == raftpb.MsgApp { t.ServerStats.SendAppendReq(m.Size()) } @@ -176,8 +177,7 @@ func (t *Transport) Send(msgs []raftpb.Message) { continue } - g, ok := t.remotes[to] - if ok { + if rok { g.send(m) continue } diff --git a/test b/test index 60551b6ed..208523d28 100755 --- a/test +++ b/test @@ -45,14 +45,13 @@ split=(${TEST// / }) TEST=${split[@]/#/${REPO_PATH}/} split=(${NO_RACE_TEST// / }) NO_RACE_TEST=${split[@]/#/${REPO_PATH}/} +MACHINE_TYPE=$(uname -m) +if [ $MACHINE_TYPE != "armv7l" ]; then + RACE="--race" +fi function unit_tests { echo "Running tests..." - - MACHINE_TYPE=$(uname -m) - if [ $MACHINE_TYPE != "armv7l" ]; then - RACE="--race" - fi go test -timeout 3m ${COVER} ${RACE} -cpu 1,2,4 $@ ${TEST} go test -timeout 3m ${COVER} -cpu 1,2,4 $@ ${NO_RACE_TEST} } @@ -61,7 +60,7 @@ function integration_tests { echo "Running integration tests..." go test -timeout 10m -v -cpu 1,2,4 $@ ${REPO_PATH}/e2e go test -timeout 15m -v -cpu 1,2,4 $@ ${REPO_PATH}/integration - go test -timeout 10m -v -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration + go test -timeout 10m -v ${RACE} -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration go test -timeout 1m -v -cpu 1,2,4 $@ ${REPO_PATH}/contrib/raftexample }