Merge pull request #2753 from yichengq/fix-remove-panic

backport #2701 to release-2.0 branch
This commit is contained in:
Yicheng Qin 2015-04-28 21:17:55 -07:00
commit 58f035844c
11 changed files with 147 additions and 56 deletions

View File

@ -191,7 +191,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
Handler: etcdhttp.NewClientHandler(s),
Info: cfg.corsInfo,
}
ph := etcdhttp.NewPeerHandler(s.Cluster, etcdserver.RaftTimer(s), s.RaftHandler())
ph := etcdhttp.NewPeerHandler(s.Cluster, s.RaftHandler())
// Start the peer server in a goroutine
for _, l := range plns {
go func(l net.Listener) {

View File

@ -59,12 +59,6 @@ type Cluster struct {
id types.ID
token string
store store.Store
// index is the raft index that cluster is updated at bootstrap
// from remote cluster info.
// It may have a higher value than local raft index, because it
// displays a further view of the cluster.
// TODO: upgrade it as last modified index
index uint64
sync.Mutex // guards members and removed map
members map[types.ID]*Member
@ -236,8 +230,6 @@ func (c *Cluster) SetID(id types.ID) { c.id = id }
func (c *Cluster) SetStore(st store.Store) { c.store = st }
func (c *Cluster) UpdateIndex(index uint64) { c.index = index }
func (c *Cluster) Recover() {
c.members, c.removed = membersFromStore(c.store)
}

View File

@ -21,7 +21,6 @@ import (
"log"
"net/http"
"sort"
"strconv"
"time"
"github.com/coreos/etcd/pkg/types"
@ -89,21 +88,7 @@ func getClusterFromRemotePeers(urls []string, logerr bool, tr *http.Transport) (
}
continue
}
var index uint64
// The header at or before v2.0.3 doesn't have this field. For backward
// compatibility, it checks whether the field exists.
if indexStr := resp.Header.Get("X-Raft-Index"); indexStr != "" {
index, err = strconv.ParseUint(indexStr, 10, 64)
if err != nil {
if logerr {
log.Printf("etcdserver: could not parse raft index: %v", err)
}
continue
}
}
cl := NewClusterFromMembers("", id, membs)
cl.UpdateIndex(index)
return cl, nil
return NewClusterFromMembers("", id, membs), nil
}
return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls")
}

View File

@ -18,7 +18,6 @@ import (
"encoding/json"
"log"
"net/http"
"strconv"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/rafthttp"
@ -29,10 +28,9 @@ const (
)
// NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, timer etcdserver.RaftTimer, raftHandler http.Handler) http.Handler {
func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, raftHandler http.Handler) http.Handler {
mh := &peerMembersHandler{
clusterInfo: clusterInfo,
timer: timer,
}
mux := http.NewServeMux()
@ -45,7 +43,6 @@ func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, timer etcdserver.RaftTim
type peerMembersHandler struct {
clusterInfo etcdserver.ClusterInfo
timer etcdserver.RaftTimer
}
func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -53,7 +50,6 @@ func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String())
w.Header().Set("X-Raft-Index", strconv.FormatUint(h.timer.Index(), 10))
if r.URL.Path != peerMembersPrefix {
http.Error(w, "bad path", http.StatusBadRequest)

View File

@ -33,7 +33,7 @@ func TestNewPeerHandlerOnRaftPrefix(t *testing.T) {
h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("test data"))
})
ph := NewPeerHandler(&fakeCluster{}, &dummyRaftTimer{}, h)
ph := NewPeerHandler(&fakeCluster{}, h)
srv := httptest.NewServer(ph)
defer srv.Close()
@ -91,7 +91,7 @@ func TestServeMembersGet(t *testing.T) {
id: 1,
members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2},
}
h := &peerMembersHandler{clusterInfo: cluster, timer: &dummyRaftTimer{}}
h := &peerMembersHandler{clusterInfo: cluster}
msb, err := json.Marshal([]etcdserver.Member{memb1, memb2})
if err != nil {
t.Fatal(err)

View File

@ -158,6 +158,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
haveWAL := wal.Exist(cfg.WALDir())
ss := snap.New(cfg.SnapDir())
var remotes []*Member
switch {
case !haveWAL && !cfg.NewCluster:
if err := cfg.VerifyJoinExisting(); err != nil {
@ -170,7 +171,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if err := ValidateClusterAndAssignIDs(cfg.Cluster, existingCluster); err != nil {
return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
}
cfg.Cluster.UpdateIndex(existingCluster.index)
remotes = existingCluster.Members()
cfg.Cluster.SetID(existingCluster.id)
cfg.Cluster.SetStore(st)
cfg.Print()
@ -260,8 +261,14 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
reqIDGen: idutil.NewGenerator(uint8(id), time.Now()),
}
// TODO: move transport initialization near the definition of remote
tr := rafthttp.NewTransporter(cfg.Transport, id, cfg.Cluster.ID(), srv, srv.errorc, sstats, lstats)
// add all the remote members into sendhub
// add all remotes into transport
for _, m := range remotes {
if m.ID != id {
tr.AddRemote(m.ID, m.PeerURLs)
}
}
for _, m := range cfg.Cluster.Members() {
if m.ID != id {
tr.AddPeer(m.ID, m.PeerURLs)
@ -395,19 +402,15 @@ func (s *EtcdServer) run() {
if err := s.store.Recovery(rd.Snapshot.Data); err != nil {
log.Panicf("recovery store error: %v", err)
}
s.Cluster.Recover()
// It avoids snapshot recovery overwriting newer cluster and
// transport setting, which may block the communication.
if s.Cluster.index < rd.Snapshot.Metadata.Index {
s.Cluster.Recover()
// recover raft transport
s.r.transport.RemoveAllPeers()
for _, m := range s.Cluster.Members() {
if m.ID == s.ID() {
continue
}
s.r.transport.AddPeer(m.ID, m.PeerURLs)
// recover raft transport
s.r.transport.RemoveAllPeers()
for _, m := range s.Cluster.Members() {
if m.ID == s.ID() {
continue
}
s.r.transport.AddPeer(m.ID, m.PeerURLs)
}
appliedi = rd.Snapshot.Metadata.Index

View File

@ -1389,6 +1389,7 @@ type nopTransporter struct{}
func (s *nopTransporter) Handler() http.Handler { return nil }
func (s *nopTransporter) Send(m []raftpb.Message) {}
func (s *nopTransporter) AddRemote(id types.ID, us []string) {}
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
func (s *nopTransporter) RemovePeer(id types.ID) {}
func (s *nopTransporter) RemoveAllPeers() {}

View File

@ -170,6 +170,46 @@ func TestForceNewCluster(t *testing.T) {
clusterMustProgress(t, c.Members[:1])
}
// Ensure we can remove a member then add a new one back immediately.
func TestIssue2681(t *testing.T) {
defer afterTest(t)
c := NewCluster(t, 5)
c.Launch(t)
defer c.Terminate(t)
c.RemoveMember(t, uint64(c.Members[4].s.ID()))
c.waitLeader(t, c.Members)
c.AddMember(t)
c.waitLeader(t, c.Members)
clusterMustProgress(t, c.Members)
}
// Ensure we can remove a member after a snapshot then add a new one back.
func TestIssue2746(t *testing.T) {
defer afterTest(t)
c := NewCluster(t, 5)
for _, m := range c.Members {
m.SnapCount = 10
}
c.Launch(t)
defer c.Terminate(t)
// force a snapshot
for i := 0; i < 20; i++ {
clusterMustProgress(t, c.Members)
}
c.RemoveMember(t, uint64(c.Members[4].s.ID()))
c.waitLeader(t, c.Members)
c.AddMember(t)
c.waitLeader(t, c.Members)
clusterMustProgress(t, c.Members)
}
// clusterMustProgress ensures that cluster can make progress. It creates
// a random key first, and check the new key could be got from all client urls
// of the cluster.
@ -526,7 +566,7 @@ func (m *member) Launch() error {
m.s.SyncTicker = time.Tick(500 * time.Millisecond)
m.s.Start()
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster, m.s, m.s.RaftHandler())}
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster, m.s.RaftHandler())}
for _, ln := range m.PeerListeners {
hs := &httptest.Server{

View File

@ -199,7 +199,7 @@ func (p *peer) handle() {
log.Printf("sender: the connection with %s became inactive", p.id)
p.active = false
}
if m.Type == raftpb.MsgApp {
if m.Type == raftpb.MsgApp && p.fs != nil {
p.fs.Fail()
}
} else {
@ -208,7 +208,7 @@ func (p *peer) handle() {
p.active = true
p.errored = nil
}
if m.Type == raftpb.MsgApp {
if m.Type == raftpb.MsgApp && p.fs != nil {
p.fs.Succ(end.Sub(start))
}
}

42
rafthttp/remote.go Normal file
View File

@ -0,0 +1,42 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"net/http"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
)
type remote struct {
id types.ID
peer *peer
}
func startRemote(tr http.RoundTripper, u string, local, to, cid types.ID, r Raft, errorc chan error) *remote {
return &remote{
id: to,
peer: NewPeer(tr, u, to, cid, r, nil, errorc),
}
}
func (g *remote) Send(m raftpb.Message) {
g.peer.send(m)
}
func (g *remote) Stop() {
g.peer.Stop()
}

View File

@ -35,6 +35,12 @@ type Raft interface {
type Transporter interface {
Handler() http.Handler
Send(m []raftpb.Message)
// AddRemote adds a remote with given peer urls into the transport.
// A remote helps newly joined member to catch up the progress of cluster,
// and will not be used after that.
// It is the caller's responsibility to ensure the urls are all vaild,
// or it panics.
AddRemote(id types.ID, urls []string)
AddPeer(id types.ID, urls []string)
RemovePeer(id types.ID)
RemoveAllPeers()
@ -50,9 +56,10 @@ type transport struct {
serverStats *stats.ServerStats
leaderStats *stats.LeaderStats
mu sync.RWMutex // protect the peer map
peers map[types.ID]*peer // remote peers
errorc chan error
mu sync.RWMutex // protect the remote and peer map
remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
peers map[types.ID]*peer // peers map
errorc chan error
}
func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan error, ss *stats.ServerStats, ls *stats.LeaderStats) Transporter {
@ -63,6 +70,7 @@ func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan
raft: r,
serverStats: ss,
leaderStats: ls,
remotes: make(map[types.ID]*remote),
peers: make(map[types.ID]*peer),
errorc: errorc,
}
@ -90,21 +98,30 @@ func (t *transport) Send(msgs []raftpb.Message) {
continue
}
to := types.ID(m.To)
p, ok := t.peers[to]
if !ok {
log.Printf("etcdserver: send message to unknown receiver %s", to)
if ok {
if m.Type == raftpb.MsgApp {
t.serverStats.SendAppendReq(m.Size())
}
p.Send(m)
continue
}
if m.Type == raftpb.MsgApp {
t.serverStats.SendAppendReq(m.Size())
g, ok := t.remotes[to]
if ok {
g.Send(m)
continue
}
p.Send(m)
log.Printf("etcdserver: send message to unknown receiver %s", to)
}
}
func (t *transport) Stop() {
for _, r := range t.remotes {
r.Stop()
}
for _, p := range t.peers {
p.Stop()
}
@ -113,6 +130,21 @@ func (t *transport) Stop() {
}
}
func (t *transport) AddRemote(id types.ID, us []string) {
t.mu.Lock()
defer t.mu.Unlock()
if _, ok := t.remotes[id]; ok {
return
}
peerURL := us[0]
u, err := url.Parse(peerURL)
if err != nil {
log.Panicf("unexpect peer url %s", peerURL)
}
u.Path = path.Join(u.Path, RaftPrefix)
t.remotes[id] = startRemote(t.roundTripper, u.String(), t.id, id, t.clusterID, t.raft, t.errorc)
}
func (t *transport) AddPeer(id types.ID, urls []string) {
t.mu.Lock()
defer t.mu.Unlock()