Merge pull request #710 from xiangli-cmu/fix_race

Fix race
This commit is contained in:
Xiang Li 2014-04-14 15:53:04 -04:00
commit 2dc182189a
7 changed files with 53 additions and 17 deletions

View File

@ -1,6 +1,7 @@
package server package server
import ( import (
"sync"
"time" "time"
"github.com/coreos/etcd/third_party/github.com/goraft/raft" "github.com/coreos/etcd/third_party/github.com/goraft/raft"
@ -27,6 +28,8 @@ type raftServerStats struct {
sendRateQueue *statsQueue sendRateQueue *statsQueue
recvRateQueue *statsQueue recvRateQueue *statsQueue
sync.Mutex
} }
func NewRaftServerStats(name string) *raftServerStats { func NewRaftServerStats(name string) *raftServerStats {
@ -43,6 +46,9 @@ func NewRaftServerStats(name string) *raftServerStats {
} }
func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) { func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) {
ss.Lock()
defer ss.Unlock()
ss.State = raft.Follower ss.State = raft.Follower
if leaderName != ss.LeaderInfo.Name { if leaderName != ss.LeaderInfo.Name {
ss.LeaderInfo.Name = leaderName ss.LeaderInfo.Name = leaderName
@ -54,6 +60,9 @@ func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) {
} }
func (ss *raftServerStats) SendAppendReq(pkgSize int) { func (ss *raftServerStats) SendAppendReq(pkgSize int) {
ss.Lock()
defer ss.Unlock()
now := time.Now() now := time.Now()
if ss.State != raft.Leader { if ss.State != raft.Leader {

View File

@ -45,6 +45,9 @@ func NewRegistry(s store.Store) *Registry {
// Peers returns a list of cached peer names. // Peers returns a list of cached peer names.
func (r *Registry) Peers() []string { func (r *Registry) Peers() []string {
r.Lock()
defer r.Unlock()
names := make([]string, 0, len(r.peers)) names := make([]string, 0, len(r.peers))
for name := range r.peers { for name := range r.peers {
names = append(names, name) names = append(names, name)
@ -55,6 +58,9 @@ func (r *Registry) Peers() []string {
// Proxies returns a list of cached proxy names. // Proxies returns a list of cached proxy names.
func (r *Registry) Proxies() []string { func (r *Registry) Proxies() []string {
r.Lock()
defer r.Unlock()
names := make([]string, 0, len(r.proxies)) names := make([]string, 0, len(r.proxies))
for name := range r.proxies { for name := range r.proxies {
names = append(names, name) names = append(names, name)
@ -68,6 +74,9 @@ func (r *Registry) RegisterPeer(name string, peerURL string, machURL string) err
if err := r.register(RegistryPeerKey, name, peerURL, machURL); err != nil { if err := r.register(RegistryPeerKey, name, peerURL, machURL); err != nil {
return err return err
} }
r.Lock()
defer r.Unlock()
r.peers[name] = r.load(RegistryPeerKey, name) r.peers[name] = r.load(RegistryPeerKey, name)
return nil return nil
} }
@ -77,14 +86,14 @@ func (r *Registry) RegisterProxy(name string, peerURL string, machURL string) er
if err := r.register(RegistryProxyKey, name, peerURL, machURL); err != nil { if err := r.register(RegistryProxyKey, name, peerURL, machURL); err != nil {
return err return err
} }
r.Lock()
defer r.Unlock()
r.proxies[name] = r.load(RegistryProxyKey, name) r.proxies[name] = r.load(RegistryProxyKey, name)
return nil return nil
} }
func (r *Registry) register(key, name string, peerURL string, machURL string) error { func (r *Registry) register(key, name string, peerURL string, machURL string) error {
r.Lock()
defer r.Unlock()
// Write data to store. // Write data to store.
v := url.Values{} v := url.Values{}
v.Set("raft", peerURL) v.Set("raft", peerURL)
@ -105,9 +114,6 @@ func (r *Registry) UnregisterProxy(name string) error {
} }
func (r *Registry) unregister(key, name string) error { func (r *Registry) unregister(key, name string) error {
r.Lock()
defer r.Unlock()
// Remove the key from the store. // Remove the key from the store.
_, err := r.store.Delete(path.Join(key, name), false, false) _, err := r.store.Delete(path.Join(key, name), false, false)
log.Debugf("Unregister: %s", name) log.Debugf("Unregister: %s", name)
@ -282,6 +288,9 @@ func (r *Registry) urls(key, leaderName, selfName string, url func(key, name str
// Removes a node from the cache. // Removes a node from the cache.
func (r *Registry) Invalidate(name string) { func (r *Registry) Invalidate(name string) {
r.Lock()
defer r.Unlock()
delete(r.peers, name) delete(r.peers, name)
delete(r.proxies, name) delete(r.proxies, name)
} }

View File

@ -135,6 +135,7 @@ func (s *store) Create(nodePath string, dir bool, value string, unique bool, exp
e, err := s.internalCreate(nodePath, dir, value, unique, false, expireTime, Create) e, err := s.internalCreate(nodePath, dir, value, unique, false, expireTime, Create)
if err == nil { if err == nil {
s.WatcherHub.notify(e)
s.Stats.Inc(CreateSuccess) s.Stats.Inc(CreateSuccess)
} else { } else {
s.Stats.Inc(CreateFail) s.Stats.Inc(CreateFail)
@ -178,6 +179,8 @@ func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Tim
e.PrevNode = prev.Node e.PrevNode = prev.Node
} }
s.WatcherHub.notify(e)
return e, nil return e, nil
} }
@ -524,8 +527,6 @@ func (s *store) internalCreate(nodePath string, dir bool, value string, unique,
s.CurrentIndex = nextIndex s.CurrentIndex = nextIndex
s.WatcherHub.notify(e)
return e, nil return e, nil
} }

14
test.sh
View File

@ -3,28 +3,28 @@
. ./build . ./build
go test -i ./http go test -i ./http
go test -v ./http go test -v ./http -race
go test -i ./store go test -i ./store
go test -v ./store go test -v ./store -race
go test -i ./server go test -i ./server
go test -v ./server go test -v ./server -race
go test -i ./config go test -i ./config
go test -v ./config go test -v ./config -race
go test -i ./server/v1/tests go test -i ./server/v1/tests
go test -v ./server/v1/tests go test -v ./server/v1/tests -race
go test -i ./server/v2/tests go test -i ./server/v2/tests
go test -v ./server/v2/tests go test -v ./server/v2/tests -race
go test -i ./mod/lock/v2/tests go test -i ./mod/lock/v2/tests
go test -v ./mod/lock/v2/tests go test -v ./mod/lock/v2/tests
go test -i ./tests/functional go test -i ./tests/functional
ETCD_BIN_PATH=$(pwd)/bin/etcd go test -v ./tests/functional ETCD_BIN_PATH=$(pwd)/bin/etcd go test -v ./tests/functional -race
fmtRes=`gofmt -l $GOFMTPATH` fmtRes=`gofmt -l $GOFMTPATH`
if [ "$fmtRes" != "" ]; then if [ "$fmtRes" != "" ]; then

View File

@ -7,6 +7,7 @@ import (
"net/http/httptest" "net/http/httptest"
"net/url" "net/url"
"strings" "strings"
"sync"
"testing" "testing"
"time" "time"
@ -20,6 +21,7 @@ import (
type garbageHandler struct { type garbageHandler struct {
t *testing.T t *testing.T
success bool success bool
sync.Mutex
} }
func (g *garbageHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (g *garbageHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -27,6 +29,9 @@ func (g *garbageHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.URL.String() != "/v2/keys/_etcd/registry/1/node1" { if r.URL.String() != "/v2/keys/_etcd/registry/1/node1" {
g.t.Fatalf("Unexpected web request") g.t.Fatalf("Unexpected web request")
} }
g.Lock()
defer g.Unlock()
g.success = true g.success = true
} }
@ -51,6 +56,8 @@ func TestDiscoveryDownNoBackupPeers(t *testing.T) {
t.Fatal(err.Error()) t.Fatal(err.Error())
} }
g.Lock()
defer g.Unlock()
if !g.success { if !g.success {
t.Fatal("Discovery server never called") t.Fatal("Discovery server never called")
} }
@ -82,6 +89,8 @@ func TestDiscoveryDownWithBackupPeers(t *testing.T) {
t.Fatal(err.Error()) t.Fatal(err.Error())
} }
g.Lock()
defer g.Unlock()
if !g.success { if !g.success {
t.Fatal("Discovery server never called") t.Fatal("Discovery server never called")
} }

View File

@ -6,17 +6,23 @@ import (
"net/http/httptest" "net/http/httptest"
"net/url" "net/url"
"os" "os"
"sync"
"testing" "testing"
"time" "time"
) )
// Ensure that etcd does not come up if the internal raft versions do not match. // Ensure that etcd does not come up if the internal raft versions do not match.
func TestInternalVersion(t *testing.T) { func TestInternalVersion(t *testing.T) {
var mu sync.Mutex
checkedVersion := false checkedVersion := false
testMux := http.NewServeMux() testMux := http.NewServeMux()
testMux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) { testMux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "This is not a version number") fmt.Fprintln(w, "This is not a version number")
mu.Lock()
defer mu.Unlock()
checkedVersion = true checkedVersion = true
}) })
@ -48,6 +54,8 @@ func TestInternalVersion(t *testing.T) {
return return
} }
mu.Lock()
defer mu.Unlock()
if checkedVersion == false { if checkedVersion == false {
t.Fatal("etcd did not check the version") t.Fatal("etcd did not check the version")
return return

View File

@ -56,7 +56,7 @@ func TestSnapshot(t *testing.T) {
index, _ := strconv.Atoi(snapshots[0].Name()[2:5]) index, _ := strconv.Atoi(snapshots[0].Name()[2:5])
if index < 507 || index > 510 { if index < 507 || index > 515 {
t.Fatal("wrong name of snapshot :", snapshots[0].Name()) t.Fatal("wrong name of snapshot :", snapshots[0].Name())
} }
@ -89,7 +89,7 @@ func TestSnapshot(t *testing.T) {
index, _ = strconv.Atoi(snapshots[0].Name()[2:6]) index, _ = strconv.Atoi(snapshots[0].Name()[2:6])
if index < 1014 || index > 1017 { if index < 1014 || index > 1025 {
t.Fatal("wrong name of snapshot :", snapshots[0].Name()) t.Fatal("wrong name of snapshot :", snapshots[0].Name())
} }
} }