rafthttp: set the API boundary of the package

This commit is contained in:
Yicheng Qin 2014-11-25 21:38:57 -08:00
parent 0630f42e7a
commit 08f839e32c
7 changed files with 166 additions and 131 deletions

View File

@ -31,16 +31,13 @@ const (
// NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler {
rh := rafthttp.NewHandler(server, server.Cluster.ID())
rsh := rafthttp.NewStreamHandler(server.SenderFinder(), server.ID(), server.Cluster.ID())
mh := &peerMembersHandler{
clusterInfo: server.Cluster,
}
mux := http.NewServeMux()
mux.HandleFunc("/", http.NotFound)
mux.Handle(rafthttp.RaftPrefix, rh)
mux.Handle(rafthttp.RaftStreamPrefix+"/", rsh)
mux.Handle(rafthttp.RaftPrefix, server.RaftHandler())
mux.Handle(peerMembersPrefix, mh)
return mux
}

View File

@ -144,11 +144,11 @@ type EtcdServer struct {
stats *stats.ServerStats
lstats *stats.LeaderStats
// sender specifies the sender to send msgs to members. sending msgs
// MUST NOT block. It is okay to drop messages, since clients should
// timeout and reissue their messages. If send is nil, server will
// panic.
sendhub SendHub
// transport specifies the transport to send and receive msgs to members.
// Sending messages MUST NOT block. It is okay to drop messages, since
// clients should timeout and reissue their messages.
// If transport is nil, server will panic.
transport rafthttp.Transporter
Ticker <-chan time.Time
SyncTicker <-chan time.Time
@ -271,13 +271,22 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
SyncTicker: time.Tick(500 * time.Millisecond),
snapCount: cfg.SnapCount,
}
srv.sendhub = newSendHub(cfg.Transport, cfg.Cluster, srv, sstats, lstats)
tr := &rafthttp.Transport{
RoundTripper: cfg.Transport,
ID: id,
ClusterID: cfg.Cluster.ID(),
Processor: srv,
ServerStats: sstats,
LeaderStats: lstats,
}
tr.Start()
// add all the remote members into sendhub
for _, m := range cfg.Cluster.Members() {
if m.Name != cfg.Name {
srv.sendhub.Add(m)
tr.AddPeer(m.ID, m.PeerURLs)
}
}
srv.transport = tr
return srv, nil
}
@ -327,7 +336,7 @@ func (s *EtcdServer) purgeFile() {
func (s *EtcdServer) ID() types.ID { return s.id }
func (s *EtcdServer) SenderFinder() rafthttp.SenderFinder { return s.sendhub }
func (s *EtcdServer) RaftHandler() http.Handler { return s.transport.Handler() }
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
if s.Cluster.IsIDRemoved(types.ID(m.From)) {
@ -343,7 +352,7 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
func (s *EtcdServer) run() {
var syncC <-chan time.Time
var shouldstop bool
shouldstopC := s.sendhub.ShouldStopNotify()
shouldstopC := s.transport.ShouldStopNotify()
// load initial state from raft storage
snap, err := s.raftStorage.Snapshot()
@ -357,7 +366,7 @@ func (s *EtcdServer) run() {
defer func() {
s.node.Stop()
s.sendhub.Stop()
s.transport.Stop()
if err := s.storage.Close(); err != nil {
log.Panicf("etcdserver: close storage error: %v", err)
}
@ -397,7 +406,7 @@ func (s *EtcdServer) run() {
}
s.raftStorage.Append(rd.Entries)
s.sendhub.Send(rd.Messages)
s.send(rd.Messages)
// recover from snapshot if it is more updated than current applied
if !raft.IsEmptySnap(rd.Snapshot) && rd.Snapshot.Metadata.Index > appliedi {
@ -663,6 +672,15 @@ func getExpirationTime(r *pb.Request) time.Time {
return t
}
func (s *EtcdServer) send(ms []raftpb.Message) {
for _, m := range ms {
if !s.Cluster.IsIDRemoved(types.ID(m.To)) {
m.To = 0
}
}
s.transport.Send(ms)
}
// apply takes entries received from Raft (after it has been committed) and
// applies them to the current state of the EtcdServer.
// The given entries should not be empty.
@ -764,7 +782,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
if m.ID == s.id {
log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
} else {
s.sendhub.Add(m)
s.transport.AddPeer(m.ID, m.PeerURLs)
log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
}
case raftpb.ConfChangeRemoveNode:
@ -775,7 +793,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
log.Println("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
return true, nil
} else {
s.sendhub.Remove(id)
s.transport.RemovePeer(id)
log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID())
}
case raftpb.ConfChangeUpdateNode:
@ -790,7 +808,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
if m.ID == s.id {
log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
} else {
s.sendhub.Update(m)
s.transport.UpdatePeer(m.ID, m.PeerURLs)
log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
}
}
@ -831,13 +849,13 @@ func (s *EtcdServer) snapshot(snapi uint64, confState *raftpb.ConfState) {
// for testing
func (s *EtcdServer) PauseSending() {
hub := s.sendhub.(*sendHub)
hub.pause()
hub := s.transport.(*rafthttp.Transport)
hub.Pause()
}
func (s *EtcdServer) ResumeSending() {
hub := s.sendhub.(*sendHub)
hub.resume()
hub := s.transport.(*rafthttp.Transport)
hub.Resume()
}
func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {

View File

@ -22,6 +22,7 @@ import (
"io/ioutil"
"log"
"math/rand"
"net/http"
"path"
"reflect"
"strconv"
@ -499,10 +500,10 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
cl.AddMember(&Member{ID: types.ID(i)})
}
srv := &EtcdServer{
id: 1,
node: &nodeRecorder{},
Cluster: cl,
sendhub: &nopSender{},
id: 1,
node: &nodeRecorder{},
Cluster: cl,
transport: &nopTransporter{},
}
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
@ -531,21 +532,24 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
func TestClusterOf1(t *testing.T) { testServer(t, 1) }
func TestClusterOf3(t *testing.T) { testServer(t, 3) }
type fakeSender struct {
type fakeTransporter struct {
ss []*EtcdServer
}
func (s *fakeSender) Sender(id types.ID) rafthttp.Sender { return nil }
func (s *fakeSender) Send(msgs []raftpb.Message) {
func (s *fakeTransporter) Handler() http.Handler { return nil }
func (s *fakeTransporter) Sender(id types.ID) rafthttp.Sender { return nil }
func (s *fakeTransporter) Send(msgs []raftpb.Message) {
for _, m := range msgs {
s.ss[m.To-1].node.Step(context.TODO(), m)
}
}
func (s *fakeSender) Add(m *Member) {}
func (s *fakeSender) Update(m *Member) {}
func (s *fakeSender) Remove(id types.ID) {}
func (s *fakeSender) Stop() {}
func (s *fakeSender) ShouldStopNotify() <-chan struct{} { return nil }
func (s *fakeTransporter) AddPeer(id types.ID, us []string) {}
func (s *fakeTransporter) UpdatePeer(id types.ID, us []string) {}
func (s *fakeTransporter) RemovePeer(id types.ID) {}
func (s *fakeTransporter) Stop() {}
func (s *fakeTransporter) ShouldStopNotify() <-chan struct{} { return nil }
func (s *fakeTransporter) Pause() {}
func (s *fakeTransporter) Resume() {}
func testServer(t *testing.T, ns uint64) {
ctx, cancel := context.WithCancel(context.Background())
@ -571,7 +575,7 @@ func testServer(t *testing.T, ns uint64) {
node: n,
raftStorage: s,
store: st,
sendhub: &fakeSender{ss},
transport: &fakeTransporter{ss},
storage: &storageRecorder{},
Ticker: tk.C,
Cluster: cl,
@ -646,7 +650,7 @@ func TestDoProposal(t *testing.T) {
node: n,
raftStorage: s,
store: st,
sendhub: &nopSender{},
transport: &nopTransporter{},
storage: &storageRecorder{},
Ticker: tk,
Cluster: cl,
@ -735,7 +739,7 @@ func TestDoProposalStopped(t *testing.T) {
node: n,
raftStorage: s,
store: st,
sendhub: &nopSender{},
transport: &nopTransporter{},
storage: &storageRecorder{},
Ticker: tk,
Cluster: cl,
@ -847,7 +851,7 @@ func TestSyncTrigger(t *testing.T) {
node: n,
raftStorage: raft.NewMemoryStorage(),
store: &storeRecorder{},
sendhub: &nopSender{},
transport: &nopTransporter{},
storage: &storageRecorder{},
SyncTicker: st,
}
@ -933,7 +937,7 @@ func TestTriggerSnap(t *testing.T) {
cl.SetStore(store.New())
srv := &EtcdServer{
store: st,
sendhub: &nopSender{},
transport: &nopTransporter{},
storage: p,
node: n,
raftStorage: s,
@ -973,7 +977,7 @@ func TestRecvSnapshot(t *testing.T) {
cl.SetStore(store.New())
s := &EtcdServer{
store: st,
sendhub: &nopSender{},
transport: &nopTransporter{},
storage: p,
node: n,
raftStorage: raft.NewMemoryStorage(),
@ -1006,7 +1010,7 @@ func TestRecvSlowSnapshot(t *testing.T) {
cl.SetStore(store.New())
s := &EtcdServer{
store: st,
sendhub: &nopSender{},
transport: &nopTransporter{},
storage: &storageRecorder{},
node: n,
raftStorage: raft.NewMemoryStorage(),
@ -1039,7 +1043,7 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) {
storage := raft.NewMemoryStorage()
s := &EtcdServer{
store: st,
sendhub: &nopSender{},
transport: &nopTransporter{},
storage: &storageRecorder{},
node: n,
raftStorage: storage,
@ -1082,7 +1086,7 @@ func TestAddMember(t *testing.T) {
node: n,
raftStorage: raft.NewMemoryStorage(),
store: &storeRecorder{},
sendhub: &nopSender{},
transport: &nopTransporter{},
storage: &storageRecorder{},
Cluster: cl,
}
@ -1117,7 +1121,7 @@ func TestRemoveMember(t *testing.T) {
node: n,
raftStorage: raft.NewMemoryStorage(),
store: &storeRecorder{},
sendhub: &nopSender{},
transport: &nopTransporter{},
storage: &storageRecorder{},
Cluster: cl,
}
@ -1151,7 +1155,7 @@ func TestUpdateMember(t *testing.T) {
node: n,
raftStorage: raft.NewMemoryStorage(),
store: &storeRecorder{},
sendhub: &nopSender{},
transport: &nopTransporter{},
storage: &storageRecorder{},
Cluster: cl,
}
@ -1219,12 +1223,12 @@ func TestPublish(t *testing.T) {
// TestPublishStopped tests that publish will be stopped if server is stopped.
func TestPublishStopped(t *testing.T) {
srv := &EtcdServer{
node: &nodeRecorder{},
sendhub: &nopSender{},
Cluster: &Cluster{},
w: &waitRecorder{},
done: make(chan struct{}),
stop: make(chan struct{}),
node: &nodeRecorder{},
transport: &nopTransporter{},
Cluster: &Cluster{},
w: &waitRecorder{},
done: make(chan struct{}),
stop: make(chan struct{}),
}
close(srv.done)
srv.publish(time.Hour)
@ -1625,15 +1629,18 @@ func (w *waitWithResponse) Register(id uint64) <-chan interface{} {
}
func (w *waitWithResponse) Trigger(id uint64, x interface{}) {}
type nopSender struct{}
type nopTransporter struct{}
func (s *nopSender) Sender(id types.ID) rafthttp.Sender { return nil }
func (s *nopSender) Send(m []raftpb.Message) {}
func (s *nopSender) Add(m *Member) {}
func (s *nopSender) Remove(id types.ID) {}
func (s *nopSender) Update(m *Member) {}
func (s *nopSender) Stop() {}
func (s *nopSender) ShouldStopNotify() <-chan struct{} { return nil }
func (s *nopTransporter) Handler() http.Handler { return nil }
func (s *nopTransporter) Sender(id types.ID) rafthttp.Sender { return nil }
func (s *nopTransporter) Send(m []raftpb.Message) {}
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
func (s *nopTransporter) RemovePeer(id types.ID) {}
func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
func (s *nopTransporter) Stop() {}
func (s *nopTransporter) ShouldStopNotify() <-chan struct{} { return nil }
func (s *nopTransporter) Pause() {}
func (s *nopTransporter) Resume() {}
func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer {
peers := make([]raft.Peer, len(ids))

View File

@ -40,10 +40,6 @@ var (
RaftStreamPrefix = path.Join(RaftPrefix, "stream")
)
type Processor interface {
Process(ctx context.Context, m raftpb.Message) error
}
type SenderFinder interface {
// Sender returns the sender of the given id.
Sender(id types.ID) Sender

View File

@ -14,7 +14,7 @@
limitations under the License.
*/
package etcdserver
package rafthttp
import (
"log"
@ -26,50 +26,39 @@ import (
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/rafthttp"
)
const (
raftPrefix = "/raft"
)
type SendHub interface {
rafthttp.SenderFinder
Send(m []raftpb.Message)
Add(m *Member)
Remove(id types.ID)
Update(m *Member)
Stop()
ShouldStopNotify() <-chan struct{}
}
type sendHub struct {
tr http.RoundTripper
cl ClusterInfo
p rafthttp.Processor
cid types.ID
p Processor
ss *stats.ServerStats
ls *stats.LeaderStats
mu sync.RWMutex // protect the sender map
senders map[types.ID]rafthttp.Sender
senders map[types.ID]Sender
shouldstop chan struct{}
}
// newSendHub creates the default send hub used to transport raft messages
// to other members. The returned sendHub will update the given ServerStats and
// LeaderStats appropriately.
func newSendHub(t http.RoundTripper, cl ClusterInfo, p rafthttp.Processor, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub {
func newSendHub(t http.RoundTripper, cid types.ID, p Processor, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub {
return &sendHub{
tr: t,
cl: cl,
cid: cid,
p: p,
ss: ss,
ls: ls,
senders: make(map[types.ID]rafthttp.Sender),
senders: make(map[types.ID]Sender),
shouldstop: make(chan struct{}, 1),
}
}
func (h *sendHub) Sender(id types.ID) rafthttp.Sender {
func (h *sendHub) Sender(id types.ID) Sender {
h.mu.RLock()
defer h.mu.RUnlock()
return h.senders[id]
@ -77,12 +66,14 @@ func (h *sendHub) Sender(id types.ID) rafthttp.Sender {
func (h *sendHub) Send(msgs []raftpb.Message) {
for _, m := range msgs {
// intentionally dropped message
if m.To == 0 {
continue
}
to := types.ID(m.To)
s, ok := h.senders[to]
if !ok {
if !h.cl.IsIDRemoved(to) {
log.Printf("etcdserver: send message to unknown receiver %s", to)
}
log.Printf("etcdserver: send message to unknown receiver %s", to)
continue
}
@ -107,55 +98,55 @@ func (h *sendHub) ShouldStopNotify() <-chan struct{} {
return h.shouldstop
}
func (h *sendHub) Add(m *Member) {
func (h *sendHub) AddPeer(id types.ID, urls []string) {
h.mu.Lock()
defer h.mu.Unlock()
if _, ok := h.senders[m.ID]; ok {
if _, ok := h.senders[id]; ok {
return
}
// TODO: considering how to switch between all available peer urls
peerURL := m.PickPeerURL()
peerURL := urls[0]
u, err := url.Parse(peerURL)
if err != nil {
log.Panicf("unexpect peer url %s", peerURL)
}
u.Path = path.Join(u.Path, raftPrefix)
fs := h.ls.Follower(m.ID.String())
s := rafthttp.NewSender(h.tr, u.String(), m.ID, h.cl.ID(), h.p, fs, h.shouldstop)
h.senders[m.ID] = s
fs := h.ls.Follower(id.String())
s := NewSender(h.tr, u.String(), id, h.cid, h.p, fs, h.shouldstop)
h.senders[id] = s
}
func (h *sendHub) Remove(id types.ID) {
func (h *sendHub) RemovePeer(id types.ID) {
h.mu.Lock()
defer h.mu.Unlock()
h.senders[id].Stop()
delete(h.senders, id)
}
func (h *sendHub) Update(m *Member) {
func (h *sendHub) UpdatePeer(id types.ID, urls []string) {
h.mu.Lock()
defer h.mu.Unlock()
// TODO: return error or just panic?
if _, ok := h.senders[m.ID]; !ok {
if _, ok := h.senders[id]; !ok {
return
}
peerURL := m.PickPeerURL()
peerURL := urls[0]
u, err := url.Parse(peerURL)
if err != nil {
log.Panicf("unexpect peer url %s", peerURL)
}
u.Path = path.Join(u.Path, raftPrefix)
h.senders[m.ID].Update(u.String())
h.senders[id].Update(u.String())
}
// for testing
func (h *sendHub) pause() {
func (h *sendHub) Pause() {
for _, s := range h.senders {
s.Pause()
}
}
func (h *sendHub) resume() {
func (h *sendHub) Resume() {
for _, s := range h.senders {
s.Resume()
}

View File

@ -14,7 +14,7 @@
limitations under the License.
*/
package etcdserver
package rafthttp
import (
"net/http"
@ -28,11 +28,9 @@ import (
)
func TestSendHubAdd(t *testing.T) {
cl := newTestCluster(nil)
ls := stats.NewLeaderStats("")
h := newSendHub(nil, cl, nil, nil, ls)
m := newTestMember(1, []string{"http://a"}, "", nil)
h.Add(m)
h := newSendHub(nil, 0, nil, nil, ls)
h.AddPeer(1, []string{"http://a"})
if _, ok := ls.Followers["1"]; !ok {
t.Errorf("FollowerStats[1] is nil, want exists")
@ -42,20 +40,18 @@ func TestSendHubAdd(t *testing.T) {
t.Fatalf("senders[1] is nil, want exists")
}
h.Add(m)
h.AddPeer(1, []string{"http://a"})
ns := h.senders[types.ID(1)]
if s != ns {
t.Errorf("sender = %p, want %p", ns, s)
t.Errorf("sender = %v, want %v", ns, s)
}
}
func TestSendHubRemove(t *testing.T) {
cl := newTestCluster(nil)
ls := stats.NewLeaderStats("")
h := newSendHub(nil, cl, nil, nil, ls)
m := newTestMember(1, []string{"http://a"}, "", nil)
h.Add(m)
h.Remove(types.ID(1))
h := newSendHub(nil, 0, nil, nil, ls)
h.AddPeer(1, []string{"http://a"})
h.RemovePeer(types.ID(1))
if _, ok := h.senders[types.ID(1)]; ok {
t.Fatalf("senders[1] exists, want removed")
@ -64,11 +60,9 @@ func TestSendHubRemove(t *testing.T) {
func TestSendHubShouldStop(t *testing.T) {
tr := newRespRoundTripper(http.StatusForbidden, nil)
cl := newTestCluster(nil)
ls := stats.NewLeaderStats("")
h := newSendHub(tr, cl, nil, nil, ls)
m := newTestMember(1, []string{"http://a"}, "", nil)
h.Add(m)
h := newSendHub(tr, 0, nil, nil, ls)
h.AddPeer(1, []string{"http://a"})
shouldstop := h.ShouldStopNotify()
select {
@ -85,20 +79,3 @@ func TestSendHubShouldStop(t *testing.T) {
t.Fatalf("cannot receive stop notification")
}
}
type respRoundTripper struct {
code int
err error
}
func newRespRoundTripper(code int, err error) *respRoundTripper {
return &respRoundTripper{code: code, err: err}
}
func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
return &http.Response{StatusCode: t.code, Body: &nopReadCloser{}}, t.err
}
type nopReadCloser struct{}
func (n *nopReadCloser) Read(p []byte) (int, error) { return 0, nil }
func (n *nopReadCloser) Close() error { return nil }

49
rafthttp/transport.go Normal file
View File

@ -0,0 +1,49 @@
package rafthttp
import (
"net/http"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
)
type Processor interface {
Process(ctx context.Context, m raftpb.Message) error
}
type Transporter interface {
Handler() http.Handler
Send(m []raftpb.Message)
AddPeer(id types.ID, urls []string)
RemovePeer(id types.ID)
UpdatePeer(id types.ID, urls []string)
Stop()
ShouldStopNotify() <-chan struct{}
}
type Transport struct {
RoundTripper http.RoundTripper
ID types.ID
ClusterID types.ID
Processor Processor
ServerStats *stats.ServerStats
LeaderStats *stats.LeaderStats
*sendHub
handler http.Handler
}
func (t *Transport) Start() {
t.sendHub = newSendHub(t.RoundTripper, t.ClusterID, t.Processor, t.ServerStats, t.LeaderStats)
h := NewHandler(t.Processor, t.ClusterID)
sh := NewStreamHandler(t.sendHub, t.ID, t.ClusterID)
mux := http.NewServeMux()
mux.Handle(RaftPrefix, h)
mux.Handle(RaftStreamPrefix+"/", sh)
t.handler = mux
}
func (t *Transport) Handler() http.Handler { return t.handler }