Merge pull request #2035 from xiang90/errorc

etcdserver: collect error from errorc
This commit is contained in:
Xiang Li 2015-01-05 11:29:01 -08:00
commit 6b8667152b
6 changed files with 42 additions and 54 deletions

View File

@ -121,8 +121,9 @@ type RaftTimer interface {
type EtcdServer struct {
cfg *ServerConfig
w wait.Wait
done chan struct{}
stop chan struct{}
done chan struct{}
errorc chan error
id types.ID
attributes Attributes
@ -253,6 +254,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
srv := &EtcdServer{
cfg: cfg,
errorc: make(chan error, 1),
store: st,
node: n,
raftStorage: s,
@ -268,7 +270,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
reqIDGen: idutil.NewGenerator(uint8(id), time.Now()),
}
tr := rafthttp.NewTransporter(cfg.Transport, id, cfg.Cluster.ID(), srv, sstats, lstats)
tr := rafthttp.NewTransporter(cfg.Transport, id, cfg.Cluster.ID(), srv, srv.errorc, sstats, lstats)
// add all the remote members into sendhub
for _, m := range cfg.Cluster.Members() {
if m.Name != cfg.Name {
@ -341,7 +343,6 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
func (s *EtcdServer) run() {
var syncC <-chan time.Time
var shouldstop bool
shouldstopC := s.transport.ShouldStopNotify()
// load initial state from raft storage
snap, err := s.raftStorage.Snapshot()
@ -420,9 +421,7 @@ func (s *EtcdServer) run() {
}
if len(ents) > 0 {
if appliedi, shouldstop = s.apply(ents, &confState); shouldstop {
m1 := fmt.Sprintf("etcdserver: removed local member %s from cluster %s", s.ID(), s.Cluster.ID())
m2 := fmt.Sprint("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
go s.stopWithDelay(10*100*time.Millisecond, m1, m2)
go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
}
}
}
@ -436,7 +435,9 @@ func (s *EtcdServer) run() {
}
case <-syncC:
s.sync(defaultSyncTimeout)
case <-shouldstopC:
case err := <-s.errorc:
log.Printf("etcdserver: %s", err)
log.Printf("etcdserver: the data-dir used by this member must be removed.")
return
case <-s.stop:
return
@ -447,24 +448,20 @@ func (s *EtcdServer) run() {
// Stop stops the server gracefully, and shuts down the running goroutine.
// Stop should be called after a Start(s), otherwise it will block forever.
func (s *EtcdServer) Stop() {
s.stopWithMessages()
}
func (s *EtcdServer) stopWithMessages(msgs ...string) {
select {
case s.stop <- struct{}{}:
for _, msg := range msgs {
log.Println(msg)
}
case <-s.done:
return
}
<-s.done
}
func (s *EtcdServer) stopWithDelay(d time.Duration, msgs ...string) {
func (s *EtcdServer) stopWithDelay(d time.Duration, err error) {
time.Sleep(d)
s.stopWithMessages(msgs...)
select {
case s.errorc <- err:
default:
}
}
// StopNotify returns a channel that receives a empty struct

View File

@ -1370,6 +1370,5 @@ func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
func (s *nopTransporter) RemovePeer(id types.ID) {}
func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
func (s *nopTransporter) Stop() {}
func (s *nopTransporter) ShouldStopNotify() <-chan struct{} { return nil }
func (s *nopTransporter) Pause() {}
func (s *nopTransporter) Resume() {}

View File

@ -49,10 +49,10 @@ type peer struct {
id types.ID
cid types.ID
tr http.RoundTripper
r Raft
fs *stats.FollowerStats
shouldstop chan struct{}
tr http.RoundTripper
r Raft
fs *stats.FollowerStats
errorc chan error
batcher *Batcher
propBatcher *ProposalBatcher
@ -72,7 +72,7 @@ type peer struct {
paused bool
}
func NewPeer(tr http.RoundTripper, u string, id types.ID, cid types.ID, r Raft, fs *stats.FollowerStats, shouldstop chan struct{}) *peer {
func NewPeer(tr http.RoundTripper, u string, id types.ID, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer {
p := &peer{
id: id,
active: true,
@ -82,7 +82,7 @@ func NewPeer(tr http.RoundTripper, u string, id types.ID, cid types.ID, r Raft,
r: r,
fs: fs,
stream: &stream{},
shouldstop: shouldstop,
errorc: errorc,
batcher: NewBatcher(100, appRespBatchMs*time.Millisecond),
propBatcher: NewProposalBatcher(100, propBatchMs*time.Millisecond),
q: make(chan *raftpb.Message, senderBufSize),
@ -224,19 +224,18 @@ func (p *peer) post(data []byte) error {
switch resp.StatusCode {
case http.StatusPreconditionFailed:
err := fmt.Errorf("conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), p.cid)
select {
case p.shouldstop <- struct{}{}:
case p.errorc <- err:
default:
}
log.Printf("rafthttp: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), p.cid)
return nil
case http.StatusForbidden:
err := fmt.Errorf("the member has been permanently removed from the cluster")
select {
case p.shouldstop <- struct{}{}:
case p.errorc <- err:
default:
}
log.Println("rafthttp: this member has been permanently removed from the cluster")
log.Println("rafthttp: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
return nil
case http.StatusNoContent:
return nil

View File

@ -144,8 +144,7 @@ func TestSenderPostBad(t *testing.T) {
{"http://10.0.0.1", http.StatusCreated, nil},
}
for i, tt := range tests {
shouldstop := make(chan struct{})
p := NewPeer(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, shouldstop)
p := NewPeer(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, make(chan error))
err := p.post([]byte("some data"))
p.Stop()
@ -155,7 +154,7 @@ func TestSenderPostBad(t *testing.T) {
}
}
func TestSenderPostShouldStop(t *testing.T) {
func TestPeerPostErrorc(t *testing.T) {
tests := []struct {
u string
code int
@ -165,14 +164,14 @@ func TestSenderPostShouldStop(t *testing.T) {
{"http://10.0.0.1", http.StatusPreconditionFailed, nil},
}
for i, tt := range tests {
shouldstop := make(chan struct{}, 1)
p := NewPeer(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, shouldstop)
errorc := make(chan error, 1)
p := NewPeer(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, errorc)
p.post([]byte("some data"))
p.Stop()
select {
case <-shouldstop:
case <-errorc:
default:
t.Fatalf("#%d: cannot receive shouldstop notification", i)
t.Fatalf("#%d: cannot receive from errorc", i)
}
}
}

View File

@ -25,7 +25,6 @@ type Transporter interface {
RemovePeer(id types.ID)
UpdatePeer(id types.ID, urls []string)
Stop()
ShouldStopNotify() <-chan struct{}
}
type transport struct {
@ -36,12 +35,12 @@ type transport struct {
serverStats *stats.ServerStats
leaderStats *stats.LeaderStats
mu sync.RWMutex // protect the peer map
peers map[types.ID]*peer // remote peers
shouldstop chan struct{}
mu sync.RWMutex // protect the peer map
peers map[types.ID]*peer // remote peers
errorc chan error
}
func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, ss *stats.ServerStats, ls *stats.LeaderStats) Transporter {
func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan error, ss *stats.ServerStats, ls *stats.LeaderStats) Transporter {
return &transport{
roundTripper: rt,
id: id,
@ -50,7 +49,7 @@ func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, ss *stats.Se
serverStats: ss,
leaderStats: ls,
peers: make(map[types.ID]*peer),
shouldstop: make(chan struct{}, 1),
errorc: errorc,
}
}
@ -99,10 +98,6 @@ func (t *transport) Stop() {
}
}
func (t *transport) ShouldStopNotify() <-chan struct{} {
return t.shouldstop
}
func (t *transport) AddPeer(id types.ID, urls []string) {
t.mu.Lock()
defer t.mu.Unlock()
@ -117,8 +112,7 @@ func (t *transport) AddPeer(id types.ID, urls []string) {
}
u.Path = path.Join(u.Path, RaftPrefix)
fs := t.leaderStats.Follower(id.String())
t.peers[id] = NewPeer(t.roundTripper, u.String(), id, t.clusterID,
t.raft, fs, t.shouldstop)
t.peers[id] = NewPeer(t.roundTripper, u.String(), id, t.clusterID, t.raft, fs, t.errorc)
}
func (t *transport) RemovePeer(id types.ID) {

View File

@ -64,27 +64,27 @@ func TestTransportRemove(t *testing.T) {
}
}
func TestTransportShouldStop(t *testing.T) {
func TestTransportErrorc(t *testing.T) {
errorc := make(chan error, 1)
tr := &transport{
roundTripper: newRespRoundTripper(http.StatusForbidden, nil),
leaderStats: stats.NewLeaderStats(""),
peers: make(map[types.ID]*peer),
shouldstop: make(chan struct{}, 1),
errorc: errorc,
}
tr.AddPeer(1, []string{"http://a"})
shouldstop := tr.ShouldStopNotify()
select {
case <-shouldstop:
t.Fatalf("received unexpected shouldstop notification")
case <-errorc:
t.Fatalf("received unexpected from errorc")
case <-time.After(10 * time.Millisecond):
}
tr.peers[1].Send(raftpb.Message{})
testutil.ForceGosched()
select {
case <-shouldstop:
case <-errorc:
default:
t.Fatalf("cannot receive stop notification")
t.Fatalf("cannot receive error from errorc")
}
}