From 449cad465826045bbe7dffb818e00e8def912a74 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Sun, 18 Aug 2013 19:43:13 -0700 Subject: [PATCH 1/7] bump(github.com/coreos/go-raft): bb7f7ec92e4cb6d98241cea83f55d0e85e624189 --- .../github.com/coreos/go-raft/README.md | 56 +++- .../github.com/coreos/go-raft/config.go | 7 + .../coreos/go-raft/http_transporter.go | 4 +- .../github.com/coreos/go-raft/join_command.go | 5 +- third_party/github.com/coreos/go-raft/log.go | 9 + third_party/github.com/coreos/go-raft/peer.go | 100 +++--- .../protobuf/snapshot_recovery_request.pb.go | 40 ++- .../protobuf/snapshot_recovery_request.proto | 10 +- .../github.com/coreos/go-raft/server.go | 291 +++++++++--------- .../github.com/coreos/go-raft/server_test.go | 126 +++++++- .../github.com/coreos/go-raft/snapshot.go | 6 +- .../go-raft/snapshot_recovery_request.go | 26 +- third_party/github.com/coreos/go-raft/test.go | 7 +- 13 files changed, 461 insertions(+), 226 deletions(-) create mode 100644 third_party/github.com/coreos/go-raft/config.go diff --git a/third_party/github.com/coreos/go-raft/README.md b/third_party/github.com/coreos/go-raft/README.md index f948a5e9c..6ecc74866 100644 --- a/third_party/github.com/coreos/go-raft/README.md +++ b/third_party/github.com/coreos/go-raft/README.md @@ -1,19 +1,49 @@ -[![Build Status](https://travis-ci.org/benbjohnson/go-raft.png?branch=master)](https://travis-ci.org/benbjohnson/go-raft) - go-raft ======= +[![Build Status](https://travis-ci.org/goraft/raft.png?branch=master)](https://travis-ci.org/goraft/raft) + ## Overview -This is an Go implementation of the Raft distributed consensus protocol. +This is a Go implementation of the Raft distributed consensus protocol. Raft is a protocol by which a cluster of nodes can maintain a replicated state machine. The state machine is kept in sync through the use of a replicated log. -For more details on Raft, you can read [In Search of an Understandable Consensus Algorithm](https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf) by Diego Ongaro and John Ousterhout. +For more details on Raft, you can read [In Search of an Understandable Consensus Algorithm][raft-paper] by Diego Ongaro and John Ousterhout. +## Project Status + +This library is feature complete but should be considered experimental until it has seen more usage. +If you have any questions on implementing go-raft in your project please file an issue. +There is an [active community][community] of developers who can help. +go-raft is under the MIT license. + +[community]: https://github.com/goraft/raft/contributors + +### Features + +- Leader election +- Log replication +- Configuration changes +- Log compaction +- Unit tests +- Fast Protobuf Log Encoding +- HTTP transport + +### Projects + +These projects are built on go-raft: + +- [coreos/etcd](https://github.com/coreos/etcd) - A highly-available key value store for shared configuration and service discovery +- [benbjohnson/raftd](https://github.com/benbjohnson/raftd) - A reference implementation for using the go-raft library for distributed consensus. + +If you have a project that you're using go-raft in, please add it to this README so others can see implementation examples. ## The Raft Protocol +This section provides a summary of the Raft protocol from a high level. +For a more detailed explanation on the failover process and election terms please see the full paper describing the protocol: [In Search of an Understandable Consensus Algorithm][raft-paper]. + ### Overview Maintaining state in a single process on a single server is easy. @@ -26,7 +56,7 @@ Servers can crash or the network between two machines can become unavailable or A distributed consensus protocol is used for maintaining a consistent state across multiple servers in a cluster. Many distributed systems are built upon the Paxos protocol but Paxos can be difficult to understand and there are many gaps between Paxos and real world implementation. -An alternative is the [Raft distributed consensus protocol](https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf) by Diego Ongaro and John Ousterhout. +An alternative is the [Raft distributed consensus protocol][raft-paper] by Diego Ongaro and John Ousterhout. Raft is a protocol built with understandability as a primary tenant and it centers around two things: 1. Leader Election @@ -53,17 +83,9 @@ By ensuring that this log is replicated identically between all the nodes in the Replicating the log under normal conditions is done by sending an `AppendEntries` RPC from the leader to each of the other servers in the cluster (called Peers). Each peer will append the entries from the leader through a 2-phase commit process which ensure that a majority of servers in the cluster have entries written to log. -For a more detailed explanation on the failover process and election terms please see the full paper describing the protocol: [In Search of an Understandable Consensus Algorithm](https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf) +## History +Ben Johnson started this library for use in his behavioral analytics database called [Sky](https://github.com/skydb/sky). +He put it under the MIT license in the hopes that it would be useful for other projects too. -## Project Status - -The go-raft library is feature complete but in alpha. -There is a reference implementation called [raftd](https://github.com/benbjohnson/raftd) that demonstrates how to use the library - -The library will be considered experimental until it has significant production usage. -I'm writing the library for the purpose of including distributed processing in my behavioral analytics database called [Sky](https://github.com/skydb/sky). -However, I hope other projects can benefit from having a distributed consensus protocol so the go-raft library is available under MIT license. - -If you have a project that you're using go-raft in, please add it to this README and send a pull request so others can see implementation examples. -If you have any questions on implementing go-raft in your project, feel free to contact me on [GitHub](https://github.com/benbjohnson), [Twitter](https://twitter.com/benbjohnson) or by e-mail at [ben@skylandlabs.com](mailto:ben@skylandlabs.com). +[raft-paper]: https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf diff --git a/third_party/github.com/coreos/go-raft/config.go b/third_party/github.com/coreos/go-raft/config.go new file mode 100644 index 000000000..d202dea0d --- /dev/null +++ b/third_party/github.com/coreos/go-raft/config.go @@ -0,0 +1,7 @@ +package raft + +type Config struct { + CommitIndex uint64 `json:"commitIndex"` + // TODO decide what we need to store in peer struct + Peers []*Peer `json:"peers"` +} diff --git a/third_party/github.com/coreos/go-raft/http_transporter.go b/third_party/github.com/coreos/go-raft/http_transporter.go index 1125f91f5..7dbcf5a40 100644 --- a/third_party/github.com/coreos/go-raft/http_transporter.go +++ b/third_party/github.com/coreos/go-raft/http_transporter.go @@ -94,7 +94,7 @@ func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, r return nil } - url := fmt.Sprintf("http://%s%s", peer.Name(), t.AppendEntriesPath()) + url := fmt.Sprintf("http://%s%s", peer.Name, t.AppendEntriesPath()) traceln(server.Name(), "POST", url) client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}} @@ -122,7 +122,7 @@ func (t *HTTPTransporter) SendVoteRequest(server *Server, peer *Peer, req *Reque return nil } - url := fmt.Sprintf("http://%s%s", peer.Name(), t.RequestVotePath()) + url := fmt.Sprintf("http://%s%s", peer.Name, t.RequestVotePath()) traceln(server.Name(), "POST", url) client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}} diff --git a/third_party/github.com/coreos/go-raft/join_command.go b/third_party/github.com/coreos/go-raft/join_command.go index 74e14239d..949eaae76 100644 --- a/third_party/github.com/coreos/go-raft/join_command.go +++ b/third_party/github.com/coreos/go-raft/join_command.go @@ -9,7 +9,8 @@ type JoinCommand interface { // Join command type DefaultJoinCommand struct { - Name string `json:"name"` + Name string `json:"name"` + ConnectionString string `json:"connectionString"` } // The name of the Join command in the log @@ -18,7 +19,7 @@ func (c *DefaultJoinCommand) CommandName() string { } func (c *DefaultJoinCommand) Apply(server *Server) (interface{}, error) { - err := server.AddPeer(c.Name) + err := server.AddPeer(c.Name, c.ConnectionString) return []byte("join"), err } diff --git a/third_party/github.com/coreos/go-raft/log.go b/third_party/github.com/coreos/go-raft/log.go index 42553f24e..b686d317c 100644 --- a/third_party/github.com/coreos/go-raft/log.go +++ b/third_party/github.com/coreos/go-raft/log.go @@ -183,6 +183,15 @@ func (l *Log) open(path string) error { // Append entry. l.entries = append(l.entries, entry) + + if entry.Index <= l.commitIndex { + command, err := newCommand(entry.CommandName, entry.Command) + if err != nil { + continue + } + l.ApplyFunc(command) + } + debugln("open.log.append log index ", entry.Index) readBytes += int64(n) diff --git a/third_party/github.com/coreos/go-raft/peer.go b/third_party/github.com/coreos/go-raft/peer.go index e7761dd97..37b8c3fb7 100644 --- a/third_party/github.com/coreos/go-raft/peer.go +++ b/third_party/github.com/coreos/go-raft/peer.go @@ -14,7 +14,8 @@ import ( // A peer is a reference to another server involved in the consensus protocol. type Peer struct { server *Server - name string + Name string `json:"name"` + ConnectionString string `json:"connectionString"` prevLogIndex uint64 mutex sync.RWMutex stopChan chan bool @@ -28,10 +29,11 @@ type Peer struct { //------------------------------------------------------------------------------ // Creates a new peer. -func newPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer { +func newPeer(server *Server, name string, connectionString string, heartbeatTimeout time.Duration) *Peer { return &Peer{ server: server, - name: name, + Name: name, + ConnectionString: connectionString, heartbeatTimeout: heartbeatTimeout, } } @@ -42,11 +44,6 @@ func newPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer // //------------------------------------------------------------------------------ -// Retrieves the name of the peer. -func (p *Peer) Name() string { - return p.name -} - // Sets the heartbeat timeout. func (p *Peer) setHeartbeatTimeout(duration time.Duration) { p.heartbeatTimeout = duration @@ -89,17 +86,17 @@ func (p *Peer) startHeartbeat() { } // Stops the peer heartbeat. -func (p *Peer) stopHeartbeat() { +func (p *Peer) stopHeartbeat(flush bool) { // here is a problem // the previous stop is no buffer leader may get blocked - // when heartbeat returns at line 132 + // when heartbeat returns // I make the channel with 1 buffer // and try to panic here select { - case p.stopChan <- true: + case p.stopChan <- flush: default: - panic("[" + p.server.Name() + "] cannot stop [" + p.Name() + "] heartbeat") + panic("[" + p.server.Name() + "] cannot stop [" + p.Name + "] heartbeat") } } @@ -113,8 +110,9 @@ func (p *Peer) clone() *Peer { p.mutex.Lock() defer p.mutex.Unlock() return &Peer{ - name: p.name, - prevLogIndex: p.prevLogIndex, + Name: p.Name, + ConnectionString: p.ConnectionString, + prevLogIndex: p.prevLogIndex, } } @@ -128,46 +126,58 @@ func (p *Peer) heartbeat(c chan bool) { c <- true - debugln("peer.heartbeat: ", p.Name(), p.heartbeatTimeout) + debugln("peer.heartbeat: ", p.Name, p.heartbeatTimeout) for { select { - case <-stopChan: - debugln("peer.heartbeat.stop: ", p.Name()) - return - - case <-time.After(p.heartbeatTimeout): - debugln("peer.heartbeat.run: ", p.Name()) - prevLogIndex := p.getPrevLogIndex() - entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest) - - if p.server.State() != Leader { + case flush := <-stopChan: + if !flush { + debugln("peer.heartbeat.stop: ", p.Name) + return + } else { + // before we can safely remove a node + // we must flush the remove command to the node first + p.flush() + debugln("peer.heartbeat.stop: ", p.Name) return } - if entries != nil { - p.sendAppendEntriesRequest(newAppendEntriesRequest(p.server.currentTerm, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries)) - } else { - p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.lastSnapshot)) - } + case <-time.After(p.heartbeatTimeout): + p.flush() } } } +func (p *Peer) flush() { + debugln("peer.heartbeat.run: ", p.Name) + prevLogIndex := p.getPrevLogIndex() + entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest) + + if p.server.State() != Leader { + return + } + + if entries != nil { + p.sendAppendEntriesRequest(newAppendEntriesRequest(p.server.currentTerm, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries)) + } else { + p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.lastSnapshot)) + } +} + //-------------------------------------- // Append Entries //-------------------------------------- // Sends an AppendEntries request to the peer through the transport. func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) { - traceln("peer.flush.send: ", p.server.Name(), "->", p.Name(), " ", len(req.Entries)) + traceln("peer.flush.send: ", p.server.Name(), "->", p.Name, " ", len(req.Entries)) resp := p.server.Transporter().SendAppendEntriesRequest(p.server, p, req) if resp == nil { - debugln("peer.flush.timeout: ", p.server.Name(), "->", p.Name()) + debugln("peer.flush.timeout: ", p.server.Name(), "->", p.Name) return } - traceln("peer.flush.recv: ", p.Name()) + traceln("peer.flush.recv: ", p.Name) // If successful then update the previous log index. p.mutex.Lock() @@ -181,7 +191,7 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) { resp.append = true } } - traceln("peer.flush.success: ", p.server.Name(), "->", p.Name(), "; idx =", p.prevLogIndex) + traceln("peer.flush.success: ", p.server.Name(), "->", p.Name, "; idx =", p.prevLogIndex) // If it was unsuccessful then decrement the previous log index and // we'll try again next time. @@ -195,7 +205,7 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) { p.prevLogIndex = resp.CommitIndex - debugln("peer.flush.commitIndex: ", p.server.Name(), "->", p.Name(), " idx =", p.prevLogIndex) + debugln("peer.flush.commitIndex: ", p.server.Name(), "->", p.Name, " idx =", p.prevLogIndex) } else if p.prevLogIndex > 0 { // Decrement the previous log index down until we find a match. Don't // let it go below where the peer's commit index is though. That's a @@ -206,35 +216,35 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) { p.prevLogIndex = resp.Index } - debugln("peer.flush.decrement: ", p.server.Name(), "->", p.Name(), " idx =", p.prevLogIndex) + debugln("peer.flush.decrement: ", p.server.Name(), "->", p.Name, " idx =", p.prevLogIndex) } } p.mutex.Unlock() // Attach the peer to resp, thus server can know where it comes from - resp.peer = p.Name() + resp.peer = p.Name // Send response to server for processing. p.server.send(resp) } // Sends an Snapshot request to the peer through the transport. func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) { - debugln("peer.snap.send: ", p.name) + debugln("peer.snap.send: ", p.Name) resp := p.server.Transporter().SendSnapshotRequest(p.server, p, req) if resp == nil { - debugln("peer.snap.timeout: ", p.name) + debugln("peer.snap.timeout: ", p.Name) return } - debugln("peer.snap.recv: ", p.name) + debugln("peer.snap.recv: ", p.Name) // If successful, the peer should have been to snapshot state // Send it the snapshot! if resp.Success { p.sendSnapshotRecoveryRequest() } else { - debugln("peer.snap.failed: ", p.name) + debugln("peer.snap.failed: ", p.Name) return } @@ -243,12 +253,12 @@ func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) { // Sends an Snapshot Recovery request to the peer through the transport. func (p *Peer) sendSnapshotRecoveryRequest() { req := newSnapshotRecoveryRequest(p.server.name, p.server.lastSnapshot) - debugln("peer.snap.recovery.send: ", p.name) + debugln("peer.snap.recovery.send: ", p.Name) resp := p.server.Transporter().SendSnapshotRecoveryRequest(p.server, p, req) if resp.Success { p.prevLogIndex = req.LastIndex } else { - debugln("peer.snap.recovery.failed: ", p.name) + debugln("peer.snap.recovery.failed: ", p.Name) return } // Send response to server for processing. @@ -261,10 +271,10 @@ func (p *Peer) sendSnapshotRecoveryRequest() { // send VoteRequest Request func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteResponse) { - debugln("peer.vote: ", p.server.Name(), "->", p.Name()) + debugln("peer.vote: ", p.server.Name(), "->", p.Name) req.peer = p if resp := p.server.Transporter().SendVoteRequest(p.server, p, req); resp != nil { - debugln("peer.vote: recv", p.server.Name(), "<-", p.Name()) + debugln("peer.vote: recv", p.server.Name(), "<-", p.Name) resp.peer = p c <- resp } diff --git a/third_party/github.com/coreos/go-raft/protobuf/snapshot_recovery_request.pb.go b/third_party/github.com/coreos/go-raft/protobuf/snapshot_recovery_request.pb.go index f580de6ab..22a281236 100644 --- a/third_party/github.com/coreos/go-raft/protobuf/snapshot_recovery_request.pb.go +++ b/third_party/github.com/coreos/go-raft/protobuf/snapshot_recovery_request.pb.go @@ -14,12 +14,12 @@ var _ = &json.SyntaxError{} var _ = math.Inf type ProtoSnapshotRecoveryRequest struct { - LeaderName *string `protobuf:"bytes,1,req" json:"LeaderName,omitempty"` - LastIndex *uint64 `protobuf:"varint,2,req" json:"LastIndex,omitempty"` - LastTerm *uint64 `protobuf:"varint,3,req" json:"LastTerm,omitempty"` - Peers []string `protobuf:"bytes,4,rep" json:"Peers,omitempty"` - State []byte `protobuf:"bytes,5,req" json:"State,omitempty"` - XXX_unrecognized []byte `json:"-"` + LeaderName *string `protobuf:"bytes,1,req" json:"LeaderName,omitempty"` + LastIndex *uint64 `protobuf:"varint,2,req" json:"LastIndex,omitempty"` + LastTerm *uint64 `protobuf:"varint,3,req" json:"LastTerm,omitempty"` + Peers []*ProtoSnapshotRecoveryRequest_ProtoPeer `protobuf:"bytes,4,rep" json:"Peers,omitempty"` + State []byte `protobuf:"bytes,5,req" json:"State,omitempty"` + XXX_unrecognized []byte `json:"-"` } func (m *ProtoSnapshotRecoveryRequest) Reset() { *m = ProtoSnapshotRecoveryRequest{} } @@ -47,7 +47,7 @@ func (m *ProtoSnapshotRecoveryRequest) GetLastTerm() uint64 { return 0 } -func (m *ProtoSnapshotRecoveryRequest) GetPeers() []string { +func (m *ProtoSnapshotRecoveryRequest) GetPeers() []*ProtoSnapshotRecoveryRequest_ProtoPeer { if m != nil { return m.Peers } @@ -61,5 +61,31 @@ func (m *ProtoSnapshotRecoveryRequest) GetState() []byte { return nil } +type ProtoSnapshotRecoveryRequest_ProtoPeer struct { + Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` + ConnectionString *string `protobuf:"bytes,2,req" json:"ConnectionString,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *ProtoSnapshotRecoveryRequest_ProtoPeer) Reset() { + *m = ProtoSnapshotRecoveryRequest_ProtoPeer{} +} +func (m *ProtoSnapshotRecoveryRequest_ProtoPeer) String() string { return proto.CompactTextString(m) } +func (*ProtoSnapshotRecoveryRequest_ProtoPeer) ProtoMessage() {} + +func (m *ProtoSnapshotRecoveryRequest_ProtoPeer) GetName() string { + if m != nil && m.Name != nil { + return *m.Name + } + return "" +} + +func (m *ProtoSnapshotRecoveryRequest_ProtoPeer) GetConnectionString() string { + if m != nil && m.ConnectionString != nil { + return *m.ConnectionString + } + return "" +} + func init() { } diff --git a/third_party/github.com/coreos/go-raft/protobuf/snapshot_recovery_request.proto b/third_party/github.com/coreos/go-raft/protobuf/snapshot_recovery_request.proto index 000c54d48..e84cca30c 100644 --- a/third_party/github.com/coreos/go-raft/protobuf/snapshot_recovery_request.proto +++ b/third_party/github.com/coreos/go-raft/protobuf/snapshot_recovery_request.proto @@ -3,7 +3,13 @@ package protobuf; message ProtoSnapshotRecoveryRequest { required string LeaderName=1; required uint64 LastIndex=2; - required uint64 LastTerm=3; - repeated string Peers=4; + required uint64 LastTerm=3; + + message ProtoPeer { + required string Name=1; + required string ConnectionString=2; + } + repeated ProtoPeer Peers=4; + required bytes State=5; } \ No newline at end of file diff --git a/third_party/github.com/coreos/go-raft/server.go b/third_party/github.com/coreos/go-raft/server.go index fbf5c94b2..c3d3fbd46 100644 --- a/third_party/github.com/coreos/go-raft/server.go +++ b/third_party/github.com/coreos/go-raft/server.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "hash/crc32" - "io" "io/ioutil" "os" "path" @@ -81,8 +80,6 @@ type Server struct { lastSnapshot *Snapshot stateMachine StateMachine maxLogEntriesPerRequest uint64 - - confFile *os.File } // An event to be processed by the server's event loop. @@ -272,11 +269,15 @@ func (s *Server) QuorumSize() int { // Retrieves the election timeout. func (s *Server) ElectionTimeout() time.Duration { + s.mutex.RLock() + defer s.mutex.RUnlock() return s.electionTimeout } // Sets the election timeout. func (s *Server) SetElectionTimeout(duration time.Duration) { + s.mutex.Lock() + defer s.mutex.Unlock() s.electionTimeout = duration } @@ -286,6 +287,8 @@ func (s *Server) SetElectionTimeout(duration time.Duration) { // Retrieves the heartbeat timeout. func (s *Server) HeartbeatTimeout() time.Duration { + s.mutex.RLock() + defer s.mutex.RUnlock() return s.heartbeatTimeout } @@ -332,14 +335,14 @@ func (s *Server) Start() error { // Create snapshot directory if not exist os.Mkdir(path.Join(s.path, "snapshot"), 0700) - // Initialize the log and load it up. - if err := s.log.open(s.LogPath()); err != nil { - s.debugln("raft: Log error: ", err) + if err := s.readConf(); err != nil { + s.debugln("raft: Conf file error: ", err) return fmt.Errorf("raft: Initialization error: %s", err) } - if err := s.readConf(); err != nil { - s.debugln("raft: Conf file error: ", err) + // Initialize the log and load it up. + if err := s.log.open(s.LogPath()); err != nil { + s.debugln("raft: Log error: ", err) return fmt.Errorf("raft: Initialization error: %s", err) } @@ -368,59 +371,12 @@ func (s *Server) Start() error { return nil } -// Read the configuration for the server. -func (s *Server) readConf() error { - var err error - confPath := path.Join(s.path, "conf") - s.debugln("readConf.open ", confPath) - // open conf file - s.confFile, err = os.OpenFile(confPath, os.O_RDWR, 0600) - - if err != nil { - if os.IsNotExist(err) { - s.confFile, err = os.OpenFile(confPath, os.O_WRONLY|os.O_CREATE, 0600) - debugln("readConf.create ", confPath) - if err != nil { - return err - } - } - return err - } - - peerNames := make([]string, 0) - - for { - var peerName string - _, err = fmt.Fscanf(s.confFile, "%s\n", &peerName) - - if err != nil { - if err == io.EOF { - s.debugln("server.peer.conf: finish") - break - } - return err - } - s.debugln("server.peer.conf.read: ", peerName) - - peerNames = append(peerNames, peerName) - } - - s.confFile.Truncate(0) - s.confFile.Seek(0, os.SEEK_SET) - - for _, peerName := range peerNames { - s.AddPeer(peerName) - } - - return nil -} - // Shuts down the server. func (s *Server) Stop() { s.send(&stopValue) s.mutex.Lock() + defer s.mutex.Unlock() s.log.close() - s.mutex.Unlock() } // Checks if the server is currently running. @@ -532,24 +488,27 @@ func (s *Server) followerLoop() { case e := <-s.c: if e.target == &stopValue { s.setState(Stopped) - } else if command, ok := e.target.(JoinCommand); ok { - //If no log entries exist and a self-join command is issued - //then immediately become leader and commit entry. - if s.log.currentIndex() == 0 && command.NodeName() == s.Name() { - s.debugln("selfjoin and promote to leader") - s.setState(Leader) - s.processCommand(command, e) - } else { + } else { + switch req := e.target.(type) { + case JoinCommand: + //If no log entries exist and a self-join command is issued + //then immediately become leader and commit entry. + if s.log.currentIndex() == 0 && req.NodeName() == s.Name() { + s.debugln("selfjoin and promote to leader") + s.setState(Leader) + s.processCommand(req, e) + } else { + err = NotLeaderError + } + case *AppendEntriesRequest: + e.returnValue, update = s.processAppendEntriesRequest(req) + case *RequestVoteRequest: + e.returnValue, update = s.processRequestVoteRequest(req) + case *SnapshotRequest: + e.returnValue = s.processSnapshotRequest(req) + default: err = NotLeaderError } - } else if req, ok := e.target.(*AppendEntriesRequest); ok { - e.returnValue, update = s.processAppendEntriesRequest(req) - } else if req, ok := e.target.(*RequestVoteRequest); ok { - e.returnValue, update = s.processRequestVoteRequest(req) - } else if req, ok := e.target.(*SnapshotRequest); ok { - e.returnValue = s.processSnapshotRequest(req) - } else { - err = NotLeaderError } // Callback to event. @@ -629,14 +588,16 @@ func (s *Server) candidateLoop() { var err error if e.target == &stopValue { s.setState(Stopped) - } else if _, ok := e.target.(Command); ok { - err = NotLeaderError - } else if req, ok := e.target.(*AppendEntriesRequest); ok { - e.returnValue, _ = s.processAppendEntriesRequest(req) - } else if req, ok := e.target.(*RequestVoteRequest); ok { - e.returnValue, _ = s.processRequestVoteRequest(req) + } else { + switch req := e.target.(type) { + case Command: + err = NotLeaderError + case *AppendEntriesRequest: + e.returnValue, _ = s.processAppendEntriesRequest(req) + case *RequestVoteRequest: + e.returnValue, _ = s.processRequestVoteRequest(req) + } } - // Callback to event. e.c <- err @@ -660,7 +621,7 @@ func (s *Server) candidateLoop() { } } -// The event loop that is run when the server is in a Candidate state. +// The event loop that is run when the server is in a Leader state. func (s *Server) leaderLoop() { s.setState(Leader) s.syncedPeer = make(map[string]bool) @@ -682,15 +643,18 @@ func (s *Server) leaderLoop() { case e := <-s.c: if e.target == &stopValue { s.setState(Stopped) - } else if command, ok := e.target.(Command); ok { - s.processCommand(command, e) - continue - } else if req, ok := e.target.(*AppendEntriesRequest); ok { - e.returnValue, _ = s.processAppendEntriesRequest(req) - } else if resp, ok := e.target.(*AppendEntriesResponse); ok { - s.processAppendEntriesResponse(resp) - } else if req, ok := e.target.(*RequestVoteRequest); ok { - e.returnValue, _ = s.processRequestVoteRequest(req) + } else { + switch req := e.target.(type) { + case Command: + s.processCommand(req, e) + continue + case *AppendEntriesRequest: + e.returnValue, _ = s.processAppendEntriesRequest(req) + case *AppendEntriesResponse: + s.processAppendEntriesResponse(req) + case *RequestVoteRequest: + e.returnValue, _ = s.processRequestVoteRequest(req) + } } // Callback to event. @@ -705,7 +669,7 @@ func (s *Server) leaderLoop() { // Stop all peers. for _, peer := range s.peers { - peer.stopHeartbeat() + peer.stopHeartbeat(false) } s.syncedPeer = nil } @@ -720,16 +684,18 @@ func (s *Server) snapshotLoop() { if e.target == &stopValue { s.setState(Stopped) - } else if _, ok := e.target.(Command); ok { - err = NotLeaderError - } else if req, ok := e.target.(*AppendEntriesRequest); ok { - e.returnValue, _ = s.processAppendEntriesRequest(req) - } else if req, ok := e.target.(*RequestVoteRequest); ok { - e.returnValue, _ = s.processRequestVoteRequest(req) - } else if req, ok := e.target.(*SnapshotRecoveryRequest); ok { - e.returnValue = s.processSnapshotRecoveryRequest(req) + } else { + switch req := e.target.(type) { + case Command: + err = NotLeaderError + case *AppendEntriesRequest: + e.returnValue, _ = s.processAppendEntriesRequest(req) + case *RequestVoteRequest: + e.returnValue, _ = s.processRequestVoteRequest(req) + case *SnapshotRecoveryRequest: + e.returnValue = s.processSnapshotRecoveryRequest(req) + } } - // Callback to event. e.c <- err @@ -959,31 +925,29 @@ func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot //-------------------------------------- // Adds a peer to the server. -func (s *Server) AddPeer(name string) error { +func (s *Server) AddPeer(name string, connectiongString string) error { s.debugln("server.peer.add: ", name, len(s.peers)) - + defer s.writeConf() // Do not allow peers to be added twice. if s.peers[name] != nil { return nil } - // Only add the peer if it doesn't have the same name. - if s.name != name { - // when loading snapshot s.confFile should be nil - if s.confFile != nil { - _, err := fmt.Fprintln(s.confFile, name) - s.debugln("server.peer.conf.write: ", name) - if err != nil { - return err - } - } - peer := newPeer(s, name, s.heartbeatTimeout) - if s.State() == Leader { - peer.startHeartbeat() - } - s.peers[peer.name] = peer + // Skip the Peer if it has the same name as the Server + if s.name == name { + return nil } + peer := newPeer(s, name, connectiongString, s.heartbeatTimeout) + + if s.State() == Leader { + peer.startHeartbeat() + } + + s.peers[peer.Name] = peer + + s.debugln("server.peer.conf.write: ", name) + return nil } @@ -991,8 +955,12 @@ func (s *Server) AddPeer(name string) error { func (s *Server) RemovePeer(name string) error { s.debugln("server.peer.remove: ", name, len(s.peers)) - // Ignore removal of the server itself. - if s.name == name { + defer s.writeConf() + + if name == s.Name() { + // when the removed node restart, it should be able + // to know it has been removed before. So we need + // to update knownCommitIndex return nil } // Return error if peer doesn't exist. @@ -1001,23 +969,13 @@ func (s *Server) RemovePeer(name string) error { return fmt.Errorf("raft: Peer not found: %s", name) } - // TODO: Flush entries to the peer first. - // Stop peer and remove it. - peer.stopHeartbeat() + if s.State() == Leader { + peer.stopHeartbeat(true) + } delete(s.peers, name) - s.confFile.Truncate(0) - s.confFile.Seek(0, os.SEEK_SET) - - for peer := range s.peers { - _, err := fmt.Fprintln(s.confFile, peer) - if err != nil { - return err - } - } - return nil } @@ -1054,14 +1012,13 @@ func (s *Server) TakeSnapshot() error { state = []byte{0} } - var peerNames []string + var peers []*Peer for _, peer := range s.peers { - peerNames = append(peerNames, peer.Name()) + peers = append(peers, peer.clone()) } - peerNames = append(peerNames, s.Name()) - s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peerNames, state, path} + s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peers, state, path} s.saveSnapshot() @@ -1144,8 +1101,8 @@ func (s *Server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *S s.peers = make(map[string]*Peer) // recovery the cluster configuration - for _, peerName := range req.Peers { - s.AddPeer(peerName) + for _, peer := range req.Peers { + s.AddPeer(peer.Name, peer.ConnectionString) } //update term and index @@ -1237,8 +1194,8 @@ func (s *Server) LoadSnapshot() error { return err } - for _, peerName := range s.lastSnapshot.Peers { - s.AddPeer(peerName) + for _, peer := range s.lastSnapshot.Peers { + s.AddPeer(peer.Name, peer.ConnectionString) } s.log.startTerm = s.lastSnapshot.LastTerm @@ -1248,6 +1205,62 @@ func (s *Server) LoadSnapshot() error { return err } +//-------------------------------------- +// Config File +//-------------------------------------- + +func (s *Server) writeConf() { + + peers := make([]*Peer, len(s.peers)) + + i := 0 + for _, peer := range s.peers { + peers[i] = peer.clone() + i++ + } + + r := &Config{ + CommitIndex: s.log.commitIndex, + Peers: peers, + } + + b, _ := json.Marshal(r) + + confPath := path.Join(s.path, "conf") + tmpConfPath := path.Join(s.path, "conf.tmp") + + err := ioutil.WriteFile(tmpConfPath, b, 0600) + + if err != nil { + panic(err) + } + + os.Rename(tmpConfPath, confPath) +} + +// Read the configuration for the server. +func (s *Server) readConf() error { + confPath := path.Join(s.path, "conf") + s.debugln("readConf.open ", confPath) + + // open conf file + b, err := ioutil.ReadFile(confPath) + + if err != nil { + return nil + } + + conf := &Config{} + + if err = json.Unmarshal(b, conf); err != nil { + return err + } + + s.log.commitIndex = conf.CommitIndex + + return nil +} + //-------------------------------------- // Debugging //-------------------------------------- diff --git a/third_party/github.com/coreos/go-raft/server_test.go b/third_party/github.com/coreos/go-raft/server_test.go index 0410846a2..2a1559970 100644 --- a/third_party/github.com/coreos/go-raft/server_test.go +++ b/third_party/github.com/coreos/go-raft/server_test.go @@ -164,10 +164,10 @@ func TestServerPromote(t *testing.T) { lookup := map[string]*Server{} transporter := &testTransporter{} transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse { - return lookup[peer.Name()].RequestVote(req) + return lookup[peer.Name].RequestVote(req) } transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse { - return lookup[peer.Name()].AppendEntries(req) + return lookup[peer.Name].AppendEntries(req) } servers := newTestCluster([]string{"1", "2", "3"}, transporter, lookup) @@ -316,6 +316,124 @@ func TestServerDenyCommandExecutionWhenFollower(t *testing.T) { } } +//-------------------------------------- +// Recovery +//-------------------------------------- + +// Ensure that a follower cannot execute a command. +func TestServerRecoverFromPreviousLogAndConf(t *testing.T) { + // Initialize the servers. + var mutex sync.RWMutex + servers := map[string]*Server{} + + transporter := &testTransporter{} + transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse { + mutex.RLock() + s := servers[peer.Name] + mutex.RUnlock() + return s.RequestVote(req) + } + transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse { + mutex.RLock() + s := servers[peer.Name] + mutex.RUnlock() + return s.AppendEntries(req) + } + + disTransporter := &testTransporter{} + disTransporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse { + return nil + } + disTransporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse { + return nil + } + + var names []string + var paths = make(map[string]string) + + n := 5 + + // add n servers + for i := 1; i <= n; i++ { + names = append(names, strconv.Itoa(i)) + } + + var leader *Server + for _, name := range names { + server := newTestServer(name, transporter) + + servers[name] = server + paths[name] = server.Path() + + if name == "1" { + leader = server + server.SetHeartbeatTimeout(testHeartbeatTimeout) + server.Start() + time.Sleep(testHeartbeatTimeout) + } else { + server.SetElectionTimeout(testElectionTimeout) + server.SetHeartbeatTimeout(testHeartbeatTimeout) + server.Start() + time.Sleep(testHeartbeatTimeout) + } + if _, err := leader.Do(&DefaultJoinCommand{Name: name}); err != nil { + t.Fatalf("Unable to join server[%s]: %v", name, err) + } + + } + + // commit some commands + for i := 0; i < 10; i++ { + if _, err := leader.Do(&testCommand2{X: 1}); err != nil { + t.Fatalf("cannot commit command:", err.Error()) + } + } + + time.Sleep(2 * testHeartbeatTimeout) + + for _, name := range names { + server := servers[name] + if server.CommitIndex() != 16 { + t.Fatalf("%s commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 16) + } + server.Stop() + } + + for _, name := range names { + // with old path and disable transportation + server := newTestServerWithPath(name, disTransporter, paths[name]) + servers[name] = server + + server.Start() + + // should only commit to the last join command + if server.CommitIndex() != 6 { + t.Fatalf("%s recover phase 1 commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 6) + } + + // peer conf should be recovered + if len(server.Peers()) != 4 { + t.Fatalf("%s recover phase 1 peer failed! [%d/%d]", name, len(server.Peers()), 4) + } + } + + // let nodes talk to each other + for _, name := range names { + servers[name].SetTransporter(transporter) + } + + time.Sleep(2 * testElectionTimeout) + + // should commit to the previous index + 1(nop command when new leader elected) + for _, name := range names { + server := servers[name] + if server.CommitIndex() != 17 { + t.Fatalf("%s commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 16) + } + server.Stop() + } +} + //-------------------------------------- // Membership //-------------------------------------- @@ -357,13 +475,13 @@ func TestServerMultiNode(t *testing.T) { transporter := &testTransporter{} transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse { mutex.RLock() - s := servers[peer.name] + s := servers[peer.Name] mutex.RUnlock() return s.RequestVote(req) } transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse { mutex.RLock() - s := servers[peer.name] + s := servers[peer.Name] mutex.RUnlock() return s.AppendEntries(req) } diff --git a/third_party/github.com/coreos/go-raft/snapshot.go b/third_party/github.com/coreos/go-raft/snapshot.go index d35474f8a..fd41c08f0 100644 --- a/third_party/github.com/coreos/go-raft/snapshot.go +++ b/third_party/github.com/coreos/go-raft/snapshot.go @@ -21,9 +21,9 @@ type Snapshot struct { LastIndex uint64 `json:"lastIndex"` LastTerm uint64 `json:"lastTerm"` // cluster configuration. - Peers []string `json: "peers"` - State []byte `json: "state"` - Path string `json: "path"` + Peers []*Peer `json: "peers"` + State []byte `json: "state"` + Path string `json: "path"` } // Save the snapshot to a file diff --git a/third_party/github.com/coreos/go-raft/snapshot_recovery_request.go b/third_party/github.com/coreos/go-raft/snapshot_recovery_request.go index e6a0efe8e..57b3e3a88 100644 --- a/third_party/github.com/coreos/go-raft/snapshot_recovery_request.go +++ b/third_party/github.com/coreos/go-raft/snapshot_recovery_request.go @@ -12,7 +12,7 @@ type SnapshotRecoveryRequest struct { LeaderName string LastIndex uint64 LastTerm uint64 - Peers []string + Peers []*Peer State []byte } @@ -36,11 +36,21 @@ func newSnapshotRecoveryRequest(leaderName string, snapshot *Snapshot) *Snapshot // Encodes the SnapshotRecoveryRequest to a buffer. Returns the number of bytes // written and any error that may have occurred. func (req *SnapshotRecoveryRequest) encode(w io.Writer) (int, error) { + + protoPeers := make([]*protobuf.ProtoSnapshotRecoveryRequest_ProtoPeer, len(req.Peers)) + + for i, peer := range req.Peers { + protoPeers[i] = &protobuf.ProtoSnapshotRecoveryRequest_ProtoPeer{ + Name: proto.String(peer.Name), + ConnectionString: proto.String(peer.ConnectionString), + } + } + pb := &protobuf.ProtoSnapshotRecoveryRequest{ LeaderName: proto.String(req.LeaderName), LastIndex: proto.Uint64(req.LastIndex), LastTerm: proto.Uint64(req.LastTerm), - Peers: req.Peers, + Peers: protoPeers, State: req.State, } p, err := proto.Marshal(pb) @@ -62,7 +72,7 @@ func (req *SnapshotRecoveryRequest) decode(r io.Reader) (int, error) { totalBytes := len(data) - pb := &protobuf.ProtoSnapshotRequest{} + pb := &protobuf.ProtoSnapshotRecoveryRequest{} if err = proto.Unmarshal(data, pb); err != nil { return -1, err } @@ -70,8 +80,16 @@ func (req *SnapshotRecoveryRequest) decode(r io.Reader) (int, error) { req.LeaderName = pb.GetLeaderName() req.LastIndex = pb.GetLastIndex() req.LastTerm = pb.GetLastTerm() - req.Peers = req.Peers req.State = req.State + req.Peers = make([]*Peer, len(pb.Peers)) + + for i, peer := range pb.Peers { + req.Peers[i] = &Peer{ + Name: peer.GetName(), + ConnectionString: peer.GetConnectionString(), + } + } + return totalBytes, nil } diff --git a/third_party/github.com/coreos/go-raft/test.go b/third_party/github.com/coreos/go-raft/test.go index 606594bf7..025cf0f58 100644 --- a/third_party/github.com/coreos/go-raft/test.go +++ b/third_party/github.com/coreos/go-raft/test.go @@ -69,6 +69,11 @@ func newTestServer(name string, transporter Transporter) *Server { return server } +func newTestServerWithPath(name string, transporter Transporter, p string) *Server { + server, _ := NewServer(name, p, transporter, nil, nil) + return server +} + func newTestServerWithLog(name string, transporter Transporter, entries []*LogEntry) *Server { server := newTestServer(name, transporter) f, err := os.Create(server.LogPath()) @@ -100,7 +105,7 @@ func newTestCluster(names []string, transporter Transporter, lookup map[string]* server.SetHeartbeatTimeout(testHeartbeatTimeout) server.Start() for _, peer := range servers { - server.AddPeer(peer.Name()) + server.AddPeer(peer.Name(), "") } } return servers From 13afdb082527b16b5d13612f56ab1d6be277edb3 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Sun, 18 Aug 2013 19:43:14 -0700 Subject: [PATCH 2/7] bump(github.com/coreos/go-etcd): 460022c1238ee0913013936e3486f41a3c7c1d7a --- third_party/github.com/coreos/go-etcd/etcd/client.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/third_party/github.com/coreos/go-etcd/etcd/client.go b/third_party/github.com/coreos/go-etcd/etcd/client.go index c4150c09b..b73d60d05 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/client.go +++ b/third_party/github.com/coreos/go-etcd/etcd/client.go @@ -123,12 +123,19 @@ func (c *Client) internalSyncCluster(machines []string) bool { continue } else { b, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() if err != nil { // try another machine in the cluster continue } // update Machines List c.cluster.Machines = strings.Split(string(b), ",") + + // update leader + // the first one in the machine list is the leader + logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, c.cluster.Machines[0]) + c.cluster.Leader = c.cluster.Machines[0] + logger.Debug("sync.machines ", c.cluster.Machines) return true } @@ -139,6 +146,7 @@ func (c *Client) internalSyncCluster(machines []string) bool { // serverName should contain both hostName and port func (c *Client) createHttpPath(serverName string, _path string) string { httpPath := path.Join(serverName, _path) + httpPath = c.config.Scheme + "://" + httpPath return httpPath } From 7ec0ee2a19085332b70a254ee2aa6281b557a5c8 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Sun, 18 Aug 2013 19:43:19 -0700 Subject: [PATCH 3/7] bump(github.com/ccding/go-logging): 4f3650d51969cc425c1940efa31fcb7c0bba86b3 --- .../github.com/ccding/go-logging/README.md | 9 ---- .../github.com/ccding/go-logging/example.go | 2 +- .../ccding/go-logging/logging/commands.go | 2 +- .../ccding/go-logging/logging/fields.go | 2 +- .../ccding/go-logging/logging/fields_test.go | 2 +- .../ccding/go-logging/logging/formater.go | 2 +- .../ccding/go-logging/logging/get_go_id.c | 2 +- .../ccding/go-logging/logging/level.go | 2 +- .../ccding/go-logging/logging/logging.go | 43 +++++++++---------- .../ccding/go-logging/logging/logging_test.go | 2 +- .../ccding/go-logging/logging/request.go | 2 +- .../ccding/go-logging/logging/writer.go | 2 +- 12 files changed, 31 insertions(+), 41 deletions(-) diff --git a/third_party/github.com/ccding/go-logging/README.md b/third_party/github.com/ccding/go-logging/README.md index dbc8187cb..cfcefb603 100644 --- a/third_party/github.com/ccding/go-logging/README.md +++ b/third_party/github.com/ccding/go-logging/README.md @@ -5,14 +5,6 @@ go-logging is a high-performance logging library for golang. low delay of about 800 nano-seconds. ## Getting Started -The stable version is under the `stable` branch, which does never revert and -is fully tested. The tags in `stable` branch indicate the version numbers. - -However, `master` branch is unstable version, and `dev` branch is development -branch. `master` branch merges `dev` branch periodically. - -Btw, all the pull request should be sent to the `dev` branch. - ### Installation The step below will download the library source code to `${GOPATH}/src/github.com/ccding/go-logging`. @@ -46,7 +38,6 @@ import ( func main() { logger, _ := logging.SimpleLogger("main") - logger.SetLevel(logging.DEBUG) logger.Error("this is a test from error") logger.Destroy() } diff --git a/third_party/github.com/ccding/go-logging/example.go b/third_party/github.com/ccding/go-logging/example.go index 5d9fc6397..1b82842fa 100644 --- a/third_party/github.com/ccding/go-logging/example.go +++ b/third_party/github.com/ccding/go-logging/example.go @@ -13,7 +13,7 @@ // limitations under the License. // // author: Cong Ding -// + package main import ( diff --git a/third_party/github.com/ccding/go-logging/logging/commands.go b/third_party/github.com/ccding/go-logging/logging/commands.go index 6cf2bb4a3..be1940598 100644 --- a/third_party/github.com/ccding/go-logging/logging/commands.go +++ b/third_party/github.com/ccding/go-logging/logging/commands.go @@ -13,7 +13,7 @@ // limitations under the License. // // author: Cong Ding -// + package logging // Logln receives log request from the client. The request includes a set of diff --git a/third_party/github.com/ccding/go-logging/logging/fields.go b/third_party/github.com/ccding/go-logging/logging/fields.go index 74c05b190..aff57fca9 100644 --- a/third_party/github.com/ccding/go-logging/logging/fields.go +++ b/third_party/github.com/ccding/go-logging/logging/fields.go @@ -13,7 +13,7 @@ // limitations under the License. // // author: Cong Ding -// + package logging import ( diff --git a/third_party/github.com/ccding/go-logging/logging/fields_test.go b/third_party/github.com/ccding/go-logging/logging/fields_test.go index 8433850d9..7efae3b07 100644 --- a/third_party/github.com/ccding/go-logging/logging/fields_test.go +++ b/third_party/github.com/ccding/go-logging/logging/fields_test.go @@ -13,7 +13,7 @@ // limitations under the License. // // author: Cong Ding -// + package logging import ( diff --git a/third_party/github.com/ccding/go-logging/logging/formater.go b/third_party/github.com/ccding/go-logging/logging/formater.go index c704d57ae..8be2a31d2 100644 --- a/third_party/github.com/ccding/go-logging/logging/formater.go +++ b/third_party/github.com/ccding/go-logging/logging/formater.go @@ -13,7 +13,7 @@ // limitations under the License. // // author: Cong Ding -// + package logging import ( diff --git a/third_party/github.com/ccding/go-logging/logging/get_go_id.c b/third_party/github.com/ccding/go-logging/logging/get_go_id.c index 400848cf9..f9b216f9f 100644 --- a/third_party/github.com/ccding/go-logging/logging/get_go_id.c +++ b/third_party/github.com/ccding/go-logging/logging/get_go_id.c @@ -13,7 +13,7 @@ // limitations under the License. // // author: Cong Ding -// + // This file defines GetGoId function, which is used to get the id of the // current goroutine. More details about this function are availeble in the // runtime.c file of golang source code. diff --git a/third_party/github.com/ccding/go-logging/logging/level.go b/third_party/github.com/ccding/go-logging/logging/level.go index e640fa626..4ada90a03 100644 --- a/third_party/github.com/ccding/go-logging/logging/level.go +++ b/third_party/github.com/ccding/go-logging/logging/level.go @@ -13,7 +13,7 @@ // limitations under the License. // // author: Cong Ding -// + package logging // Level is the type of level. diff --git a/third_party/github.com/ccding/go-logging/logging/logging.go b/third_party/github.com/ccding/go-logging/logging/logging.go index 1981c9a61..6467d94ef 100644 --- a/third_party/github.com/ccding/go-logging/logging/logging.go +++ b/third_party/github.com/ccding/go-logging/logging/logging.go @@ -99,13 +99,15 @@ func RichLogger(name string) (*Logger, error) { func FileLogger(name string, level Level, format string, timeFormat string, file string, sync bool) (*Logger, error) { out, err := os.Create(file) if err != nil { - return new(Logger), err + return nil, err } logger, err := createLogger(name, level, format, timeFormat, out, sync) if err == nil { logger.fd = out + return logger, nil + } else { + return nil, err } - return logger, err } // WriterLogger creates a new logger with a writer @@ -115,38 +117,35 @@ func WriterLogger(name string, level Level, format string, timeFormat string, ou // WriterLogger creates a new logger from a configuration file func ConfigLogger(filename string) (*Logger, error) { - conf, err := config.Read(filename) + conf := config.NewConfig(filename) + err := conf.Read() if err != nil { - return new(Logger), err + return nil, err } - ok := true - name, ok := conf["name"] - if !ok { - name = "" - } - slevel, ok := conf["level"] - if !ok { + name := conf.Get("", "name") + slevel := conf.Get("", "level") + if slevel == "" { slevel = "0" } l, err := strconv.Atoi(slevel) if err != nil { - return new(Logger), err + return nil, err } level := Level(l) - format, ok := conf["format"] - if !ok { + format := conf.Get("", "format") + if format == "" { format = BasicFormat } - timeFormat, ok := conf["timeFormat"] - if !ok { + timeFormat := conf.Get("", "timeFormat") + if timeFormat == "" { timeFormat = DefaultTimeFormat } - ssync, ok := conf["sync"] - if !ok { + ssync := conf.Get("", "sync") + if ssync == "" { ssync = "0" } - file, ok := conf["file"] - if !ok { + file := conf.Get("", "file") + if file == "" { file = DefaultFileName } sync := true @@ -155,7 +154,7 @@ func ConfigLogger(filename string) (*Logger, error) { } else if ssync == "1" { sync = true } else { - return new(Logger), err + return nil, err } return FileLogger(name, level, format, timeFormat, file, sync) } @@ -166,7 +165,7 @@ func createLogger(name string, level Level, format string, timeFormat string, ou err := logger.parseFormat(format) if err != nil { - return logger, err + return nil, err } // asign values to logger diff --git a/third_party/github.com/ccding/go-logging/logging/logging_test.go b/third_party/github.com/ccding/go-logging/logging/logging_test.go index b6e0fa502..fcf4bcce6 100644 --- a/third_party/github.com/ccding/go-logging/logging/logging_test.go +++ b/third_party/github.com/ccding/go-logging/logging/logging_test.go @@ -13,7 +13,7 @@ // limitations under the License. // // author: Cong Ding -// + package logging import ( diff --git a/third_party/github.com/ccding/go-logging/logging/request.go b/third_party/github.com/ccding/go-logging/logging/request.go index e823fa719..4b8641083 100644 --- a/third_party/github.com/ccding/go-logging/logging/request.go +++ b/third_party/github.com/ccding/go-logging/logging/request.go @@ -13,7 +13,7 @@ // limitations under the License. // // author: Cong Ding -// + package logging // request struct stores the logger request diff --git a/third_party/github.com/ccding/go-logging/logging/writer.go b/third_party/github.com/ccding/go-logging/logging/writer.go index 12d17901a..9efeddf60 100644 --- a/third_party/github.com/ccding/go-logging/logging/writer.go +++ b/third_party/github.com/ccding/go-logging/logging/writer.go @@ -13,7 +13,7 @@ // limitations under the License. // // author: Cong Ding -// + package logging import ( From 2662b3c5594f3fc8ea9e7120410d7cd586e4ff03 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Sun, 18 Aug 2013 19:43:20 -0700 Subject: [PATCH 4/7] bump(github.com/ccding/go-config-reader): 8b6c2b50197f20da3b1c5944c274c173634dc056 --- .../ccding/go-config-reader/config/config.go | 140 +++++++++++++++--- .../ccding/go-config-reader/example.go | 12 +- 2 files changed, 127 insertions(+), 25 deletions(-) diff --git a/third_party/github.com/ccding/go-config-reader/config/config.go b/third_party/github.com/ccding/go-config-reader/config/config.go index 97b899ef2..36ca20dec 100644 --- a/third_party/github.com/ccding/go-config-reader/config/config.go +++ b/third_party/github.com/ccding/go-config-reader/config/config.go @@ -13,23 +13,60 @@ // limitations under the License. // // author: Cong Ding -// + package config import ( "bufio" "errors" + "fmt" + "io/ioutil" "os" "strings" ) var commentPrefix = []string{"//", "#", ";"} -func Read(filename string) (map[string]string, error) { - var res = map[string]string{} - in, err := os.Open(filename) +// Config struct constructs a new configuration handler. +type Config struct { + filename string + config map[string]map[string]string +} + +// NewConfig function cnstructs a new Config struct with filename. You have to +// call Read() function to let it read from the file. Otherwise you will get +// empty string (i.e., "") when you are calling Get() function. Another usage +// is that you call NewConfig() function and then call Add()/Set() function to +// add new key-values to the configuration. Finally you can call Write() +// function to write the new configuration to the file. +func NewConfig(filename string) *Config { + c := new(Config) + c.filename = filename + c.config = make(map[string]map[string]string) + return c +} + +// Filename function returns the filename of the configuration. +func (c *Config) Filename() string { + return c.filename +} + +// SetFilename function sets the filename of the configuration. +func (c *Config) SetFilename(filename string) { + c.filename = filename +} + +// Reset function reset the map in the configuration. +func (c *Config) Reset() { + c.config = make(map[string]map[string]string) +} + +// Read function reads configurations from the file defined in +// Config.filename. +func (c *Config) Read() error { + in, err := os.Open(c.filename) if err != nil { - return res, err + return err } defer in.Close() scanner := bufio.NewScanner(in) @@ -40,9 +77,9 @@ func Read(filename string) (map[string]string, error) { continue } if line == "" { - sec := checkSection(scanner.Text()) - if sec != "" { - section = sec + "." + sec, ok := checkSection(scanner.Text()) + if ok { + section = sec continue } } @@ -54,40 +91,103 @@ func Read(filename string) (map[string]string, error) { line = line[:len(line)-1] continue } - key, value, err := checkLine(line) - if err != nil { - return res, errors.New("WRONG: " + line) + key, value, ok := checkLine(line) + if !ok { + return errors.New("WRONG: " + line) } - res[section+key] = value + c.Set(section, key, value) line = "" } - return res, nil + return nil } -func checkSection(line string) string { +// Get function returns the value of a key in the configuration. If the key +// does not exist, it returns empty string (i.e., ""). +func (c *Config) Get(section string, key string) string { + value, ok := c.config[section][key] + if !ok { + return "" + } + return value +} + +// Set function updates the value of a key in the configuration. Function +// Set() is exactly the same as function Add(). +func (c *Config) Set(section string, key string, value string) { + _, ok := c.config[section] + if !ok { + c.config[section] = make(map[string]string) + } + c.config[section][key] = value +} + +// Add function adds a new key to the configuration. Function Add() is exactly +// the same as function Set(). +func (c *Config) Add(section string, key string, value string) { + c.Set(section, key, value) +} + +// Del function deletes a key from the configuration. +func (c *Config) Del(section string, key string) { + _, ok := c.config[section] + if ok { + delete(c.config[section], key) + if len(c.config[section]) == 0 { + delete(c.config, section) + } + } +} + +// Write function writes the updated configuration back. +func (c *Config) Write() error { + return nil +} + +// WriteTo function writes the configuration to a new file. This function +// re-organizes the configuration and deletes all the comments. +func (c *Config) WriteTo(filename string) error { + content := "" + for k, v := range c.config { + format := "%v = %v\n" + if k != "" { + content += fmt.Sprintf("[%v]\n", k) + format = "\t" + format + } + for key, value := range v { + content += fmt.Sprintf(format, key, value) + } + } + return ioutil.WriteFile(filename, []byte(content), 0644) +} + +// To check this line is a section or not. If it is not a section, it returns +// "". +func checkSection(line string) (string, bool) { line = strings.TrimSpace(line) lineLen := len(line) if lineLen < 2 { - return "" + return "", false } if line[0] == '[' && line[lineLen-1] == ']' { - return line[1 : lineLen-1] + return line[1 : lineLen-1], true } - return "" + return "", false } -func checkLine(line string) (string, string, error) { +// To check this line is a valid key-value pair or not. +func checkLine(line string) (string, string, bool) { key := "" value := "" sp := strings.SplitN(line, "=", 2) if len(sp) != 2 { - return key, value, errors.New("WRONG: " + line) + return key, value, false } key = strings.TrimSpace(sp[0]) value = strings.TrimSpace(sp[1]) - return key, value, nil + return key, value, true } +// To check this line is a whole line comment or not. func checkComment(line string) bool { line = strings.TrimSpace(line) for p := range commentPrefix { diff --git a/third_party/github.com/ccding/go-config-reader/example.go b/third_party/github.com/ccding/go-config-reader/example.go index da49575f2..17587442b 100644 --- a/third_party/github.com/ccding/go-config-reader/example.go +++ b/third_party/github.com/ccding/go-config-reader/example.go @@ -13,7 +13,7 @@ // limitations under the License. // // author: Cong Ding -// + package main import ( @@ -22,9 +22,11 @@ import ( ) func main() { - res, err := config.Read("example.conf") + c := config.NewConfig("example.conf") + err := c.Read() fmt.Println(err) - fmt.Println(res) - fmt.Println(res["test.a"]) - fmt.Println(res["dd"]) + fmt.Println(c) + fmt.Println(c.Get("test", "a")) + fmt.Println(c.Get("", "dd")) + c.WriteTo("example2.conf") } From ea28b1cdf3073902765ed0e2603bfd8e3182cb35 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Sun, 18 Aug 2013 19:43:24 -0700 Subject: [PATCH 5/7] bump(code.google.com/p/go.net): bc411e2ac33f --- .../p/go.net/netutil/listen.go | 50 ++++++++++++++ .../p/go.net/netutil/listen_test.go | 65 +++++++++++++++++++ .../code.google.com/p/go.net/proxy/proxy.go | 2 +- 3 files changed, 116 insertions(+), 1 deletion(-) create mode 100644 third_party/code.google.com/p/go.net/netutil/listen.go create mode 100644 third_party/code.google.com/p/go.net/netutil/listen_test.go diff --git a/third_party/code.google.com/p/go.net/netutil/listen.go b/third_party/code.google.com/p/go.net/netutil/listen.go new file mode 100644 index 000000000..b23c6e99b --- /dev/null +++ b/third_party/code.google.com/p/go.net/netutil/listen.go @@ -0,0 +1,50 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package netutil provides network utility functions, complementing the more +// common ones in the net package. +package netutil + +import ( + "net" + "sync" +) + +// LimitListener returns a Listener that accepts at most n simultaneous +// connections from the provided Listener. +func LimitListener(l net.Listener, n int) net.Listener { + ch := make(chan struct{}, n) + for i := 0; i < n; i++ { + ch <- struct{}{} + } + return &limitListener{l, ch} +} + +type limitListener struct { + net.Listener + ch chan struct{} +} + +func (l *limitListener) Accept() (net.Conn, error) { + <-l.ch + c, err := l.Listener.Accept() + if err != nil { + return nil, err + } + return &limitListenerConn{Conn: c, ch: l.ch}, nil +} + +type limitListenerConn struct { + net.Conn + ch chan<- struct{} + close sync.Once +} + +func (l *limitListenerConn) Close() error { + err := l.Conn.Close() + l.close.Do(func() { + l.ch <- struct{}{} + }) + return err +} diff --git a/third_party/code.google.com/p/go.net/netutil/listen_test.go b/third_party/code.google.com/p/go.net/netutil/listen_test.go new file mode 100644 index 000000000..240eca1ea --- /dev/null +++ b/third_party/code.google.com/p/go.net/netutil/listen_test.go @@ -0,0 +1,65 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package netutil + +import ( + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestLimitListener(t *testing.T) { + const ( + max = 5 + num = 200 + ) + + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("Listen: %v", err) + } + defer l.Close() + l = LimitListener(l, max) + + var open int32 + go http.Serve(l, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if n := atomic.AddInt32(&open, 1); n > max { + t.Errorf("%d open connections, want <= %d", n, max) + } + defer atomic.AddInt32(&open, -1) + time.Sleep(10 * time.Millisecond) + fmt.Fprint(w, "some body") + })) + + var wg sync.WaitGroup + var failed int32 + for i := 0; i < num; i++ { + wg.Add(1) + go func() { + defer wg.Done() + r, err := http.Get("http://" + l.Addr().String()) + if err != nil { + t.Logf("Get: %v", err) + atomic.AddInt32(&failed, 1) + return + } + defer r.Body.Close() + io.Copy(ioutil.Discard, r.Body) + }() + } + wg.Wait() + + // We expect some Gets to fail as the kernel's accept queue is filled, + // but most should succeed. + if failed >= num/2 { + t.Errorf("too many Gets failed") + } +} diff --git a/third_party/code.google.com/p/go.net/proxy/proxy.go b/third_party/code.google.com/p/go.net/proxy/proxy.go index b6cfd4510..8ccb0c5f7 100644 --- a/third_party/code.google.com/p/go.net/proxy/proxy.go +++ b/third_party/code.google.com/p/go.net/proxy/proxy.go @@ -24,7 +24,7 @@ type Auth struct { User, Password string } -// DefaultDialer returns the dialer specified by the proxy related variables in +// FromEnvironment returns the dialer specified by the proxy related variables in // the environment. func FromEnvironment() Dialer { allProxy := os.Getenv("all_proxy") From 111888adea7a53f550a1b9f1cdad5d3758d275e9 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Sun, 18 Aug 2013 19:43:26 -0700 Subject: [PATCH 6/7] bump(code.google.com/p/goprotobuf): 1141ccae4b85 --- .../code.google.com/p/goprotobuf/proto/lib.go | 28 +++++-------------- 1 file changed, 7 insertions(+), 21 deletions(-) diff --git a/third_party/code.google.com/p/goprotobuf/proto/lib.go b/third_party/code.google.com/p/goprotobuf/proto/lib.go index dfb4206d3..fa6fe22ea 100644 --- a/third_party/code.google.com/p/goprotobuf/proto/lib.go +++ b/third_party/code.google.com/p/goprotobuf/proto/lib.go @@ -332,17 +332,13 @@ func (p *Buffer) buffree(s []byte) { // Bool is a helper routine that allocates a new bool value // to store v and returns a pointer to it. func Bool(v bool) *bool { - p := new(bool) - *p = v - return p + return &v } // Int32 is a helper routine that allocates a new int32 value // to store v and returns a pointer to it. func Int32(v int32) *int32 { - p := new(int32) - *p = v - return p + return &v } // Int is a helper routine that allocates a new int32 value @@ -357,25 +353,19 @@ func Int(v int) *int32 { // Int64 is a helper routine that allocates a new int64 value // to store v and returns a pointer to it. func Int64(v int64) *int64 { - p := new(int64) - *p = v - return p + return &v } // Float32 is a helper routine that allocates a new float32 value // to store v and returns a pointer to it. func Float32(v float32) *float32 { - p := new(float32) - *p = v - return p + return &v } // Float64 is a helper routine that allocates a new float64 value // to store v and returns a pointer to it. func Float64(v float64) *float64 { - p := new(float64) - *p = v - return p + return &v } // Uint32 is a helper routine that allocates a new uint32 value @@ -389,17 +379,13 @@ func Uint32(v uint32) *uint32 { // Uint64 is a helper routine that allocates a new uint64 value // to store v and returns a pointer to it. func Uint64(v uint64) *uint64 { - p := new(uint64) - *p = v - return p + return &v } // String is a helper routine that allocates a new string value // to store v and returns a pointer to it. func String(v string) *string { - p := new(string) - *p = v - return p + return &v } // EnumName is a helper function to simplify printing protocol buffer enums From f813017f1bcbb7be21707f30cbea4b4af9f66264 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 18 Aug 2013 21:12:36 -0700 Subject: [PATCH 7/7] fix raft api --- command.go | 2 +- .../github.com/coreos/go-etcd/etcd/client.go | 27 ++++++++++--------- transporter.go | 8 +++--- 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/command.go b/command.go index 480d9db70..5a5149a6e 100644 --- a/command.go +++ b/command.go @@ -155,7 +155,7 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) { addNameToURL(c.Name, c.RaftURL, c.EtcdURL) // add peer in raft - err := raftServer.AddPeer(c.Name) + err := raftServer.AddPeer(c.Name, "") // add machine in etcd storage key := path.Join("_etcd/machines", c.Name) diff --git a/third_party/github.com/coreos/go-etcd/etcd/client.go b/third_party/github.com/coreos/go-etcd/etcd/client.go index b73d60d05..7624c6a9d 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/client.go +++ b/third_party/github.com/coreos/go-etcd/etcd/client.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "net" "net/http" + "net/url" "path" "strings" "time" @@ -39,10 +40,10 @@ func NewClient() *Client { // default leader and machines cluster := Cluster{ - Leader: "0.0.0.0:4001", + Leader: "http://0.0.0.0:4001", Machines: make([]string, 1), } - cluster.Machines[0] = "0.0.0.0:4001" + cluster.Machines[0] = "http://0.0.0.0:4001" config := Config{ // default use http @@ -145,9 +146,9 @@ func (c *Client) internalSyncCluster(machines []string) bool { // serverName should contain both hostName and port func (c *Client) createHttpPath(serverName string, _path string) string { - httpPath := path.Join(serverName, _path) - httpPath = c.config.Scheme + "://" + httpPath - return httpPath + u, _ := url.Parse(serverName) + u.Path = path.Join(u.Path, "/", _path) + return u.String() } // Dial with timeout. @@ -156,22 +157,21 @@ func dialTimeout(network, addr string) (net.Conn, error) { } func (c *Client) getHttpPath(s ...string) string { - httpPath := path.Join(c.cluster.Leader, version) + u, _ := url.Parse(c.cluster.Leader) + + u.Path = path.Join(u.Path, "/", version) for _, seg := range s { - httpPath = path.Join(httpPath, seg) + u.Path = path.Join(u.Path, seg) } - httpPath = c.config.Scheme + "://" + httpPath - return httpPath + return u.String() } func (c *Client) updateLeader(httpPath string) { - // httpPath http://127.0.0.1:4001/v1... - leader := strings.Split(httpPath, "://")[1] - // we want to have 127.0.0.1:4001 + u, _ := url.Parse(httpPath) + leader := u.Host - leader = strings.Split(leader, "/")[0] logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, leader) c.cluster.Leader = leader } @@ -188,6 +188,7 @@ func (c *Client) sendRequest(method string, _path string, body string) (*http.Re for { httpPath := c.getHttpPath(_path) + logger.Debug("send.request.to ", httpPath) if body == "" { diff --git a/transporter.go b/transporter.go index 1c1eb41be..c49479bc8 100644 --- a/transporter.go +++ b/transporter.go @@ -47,7 +47,7 @@ func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Pe var b bytes.Buffer json.NewEncoder(&b).Encode(req) - u, _ := nameToRaftURL(peer.Name()) + u, _ := nameToRaftURL(peer.Name) debugf("Send LogEntries to %s ", u) resp, err := t.Post(fmt.Sprintf("%s/log/append", u), &b) @@ -74,7 +74,7 @@ func (t transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req * var b bytes.Buffer json.NewEncoder(&b).Encode(req) - u, _ := nameToRaftURL(peer.Name()) + u, _ := nameToRaftURL(peer.Name) debugf("Send Vote to %s", u) resp, err := t.Post(fmt.Sprintf("%s/vote", u), &b) @@ -100,7 +100,7 @@ func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, r var b bytes.Buffer json.NewEncoder(&b).Encode(req) - u, _ := nameToRaftURL(peer.Name()) + u, _ := nameToRaftURL(peer.Name) debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u, req.LastTerm, req.LastIndex) @@ -128,7 +128,7 @@ func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft var b bytes.Buffer json.NewEncoder(&b).Encode(req) - u, _ := nameToRaftURL(peer.Name()) + u, _ := nameToRaftURL(peer.Name) debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u, req.LastTerm, req.LastIndex)