bump(github.com/goraft/raft): 6bf34b9

Move from coreos/raft to goraft/raft and update to latest.
This commit is contained in:
Ben Johnson
2014-03-24 15:09:47 -07:00
parent 7d4fda550d
commit 174b9ff343
95 changed files with 1269 additions and 1241 deletions

View File

@@ -25,7 +25,7 @@ import (
"runtime"
"time"
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
"github.com/coreos/etcd/config"
ehttp "github.com/coreos/etcd/http"

View File

@@ -4,7 +4,7 @@ import (
"fmt"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
)
func init() {

View File

@@ -6,7 +6,7 @@ import (
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
)
func init() {

View File

@@ -12,7 +12,7 @@ import (
"strconv"
"time"
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
"github.com/coreos/etcd/third_party/github.com/gorilla/mux"
"github.com/coreos/etcd/discovery"

View File

@@ -7,7 +7,7 @@ import (
"strconv"
"time"
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
"github.com/coreos/etcd/third_party/github.com/gorilla/mux"
etcdErr "github.com/coreos/etcd/error"

View File

@@ -3,7 +3,7 @@ package server
import (
"time"
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
)
type raftServerStats struct {

View File

@@ -6,7 +6,7 @@ import (
"os"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
)
func init() {

View File

@@ -8,7 +8,7 @@ import (
"strings"
"time"
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
"github.com/coreos/etcd/third_party/github.com/gorilla/mux"
etcdErr "github.com/coreos/etcd/error"

View File

@@ -1,7 +1,7 @@
package server
import (
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
)
func init() {

View File

@@ -10,7 +10,7 @@ import (
"time"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
)
// Transporter layer for communication between raft nodes

View File

@@ -5,7 +5,7 @@ import (
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
"github.com/coreos/etcd/third_party/github.com/gorilla/mux"
)

View File

@@ -2,7 +2,7 @@ package v1
import (
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
"net/http"
)

View File

@@ -9,7 +9,7 @@ import (
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
"github.com/coreos/etcd/third_party/github.com/gorilla/mux"
)

View File

@@ -7,7 +7,7 @@ import (
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
"github.com/coreos/etcd/third_party/github.com/gorilla/mux"
)

View File

@@ -2,7 +2,7 @@ package v2
import (
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
"net/http"
)

View File

@@ -4,7 +4,7 @@ import (
"fmt"
"time"
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
)
// A lookup of factories by version.

View File

@@ -4,7 +4,7 @@ import (
"time"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
)
func init() {

View File

@@ -3,7 +3,7 @@ package v2
import (
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
)
func init() {

View File

@@ -5,7 +5,7 @@ import (
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
)
func init() {

View File

@@ -5,7 +5,7 @@ import (
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
)
func init() {

View File

@@ -3,7 +3,7 @@ package v2
import (
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
)
func init() {

View File

@@ -5,7 +5,7 @@ import (
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
)
func init() {

View File

@@ -4,7 +4,7 @@ import (
"time"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
)
func init() {

View File

@@ -3,7 +3,7 @@ package v2
import (
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
"time"
)

View File

@@ -37,7 +37,7 @@ func TestV1SoloMigration(t *testing.T) {
args := []string{"etcd", fmt.Sprintf("-data-dir=%s", nodepath)}
args = append(args, "-addr", "127.0.0.1:4001")
args = append(args, "-peer-addr", "127.0.0.1:7001")
args = append(args, "-name", "v1")
args = append(args, "-name", "node0")
process, err := os.StartProcess(EtcdBinPath, args, procAttr)
if err != nil {
t.Fatal("start process failed:" + err.Error())

View File

@@ -4,7 +4,7 @@ import (
"net/http"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
"github.com/stretchr/testify/mock"
)

View File

@@ -7,7 +7,7 @@ import (
"sync"
"time"
"github.com/coreos/etcd/third_party/github.com/coreos/raft"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
"github.com/coreos/etcd/metrics"
"github.com/coreos/etcd/server"

View File

@@ -89,7 +89,7 @@
package example
import "code.google.com/p/gogoprotobuf/proto"
import "github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto"
type FOO int32
const (
@@ -168,7 +168,7 @@
import (
"log"
"code.google.com/p/gogoprotobuf/proto"
"github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto"
"./example.pb"
)

View File

@@ -4,7 +4,7 @@
package testdata
import proto "code.google.com/p/gogoprotobuf/proto"
import proto "github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto"
import json "encoding/json"
import math "math"

View File

@@ -1,10 +0,0 @@
language: go
go:
- 1.1.2
- 1.2
install:
- go get github.com/stretchr/testify/assert
- make dependencies

View File

@@ -1,82 +0,0 @@
package raft
import (
"io"
"io/ioutil"
"github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto"
"github.com/coreos/etcd/third_party/github.com/coreos/raft/protobuf"
)
// The request sent to a server to append entries to the log.
type AppendEntriesRequest struct {
Term uint64
PrevLogIndex uint64
PrevLogTerm uint64
CommitIndex uint64
LeaderName string
Entries []*protobuf.LogEntry
}
// Creates a new AppendEntries request.
func newAppendEntriesRequest(term uint64, prevLogIndex uint64, prevLogTerm uint64,
commitIndex uint64, leaderName string, entries []*LogEntry) *AppendEntriesRequest {
pbEntries := make([]*protobuf.LogEntry, len(entries))
for i := range entries {
pbEntries[i] = entries[i].pb
}
return &AppendEntriesRequest{
Term: term,
PrevLogIndex: prevLogIndex,
PrevLogTerm: prevLogTerm,
CommitIndex: commitIndex,
LeaderName: leaderName,
Entries: pbEntries,
}
}
// Encodes the AppendEntriesRequest to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (req *AppendEntriesRequest) Encode(w io.Writer) (int, error) {
pb := &protobuf.AppendEntriesRequest{
Term: proto.Uint64(req.Term),
PrevLogIndex: proto.Uint64(req.PrevLogIndex),
PrevLogTerm: proto.Uint64(req.PrevLogTerm),
CommitIndex: proto.Uint64(req.CommitIndex),
LeaderName: proto.String(req.LeaderName),
Entries: req.Entries,
}
p, err := proto.Marshal(pb)
if err != nil {
return -1, err
}
return w.Write(p)
}
// Decodes the AppendEntriesRequest from a buffer. Returns the number of bytes read and
// any error that occurs.
func (req *AppendEntriesRequest) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)
if err != nil {
return -1, err
}
pb := new(protobuf.AppendEntriesRequest)
if err := proto.Unmarshal(data, pb); err != nil {
return -1, err
}
req.Term = pb.GetTerm()
req.PrevLogIndex = pb.GetPrevLogIndex()
req.PrevLogTerm = pb.GetPrevLogTerm()
req.CommitIndex = pb.GetCommitIndex()
req.LeaderName = pb.GetLeaderName()
req.Entries = pb.GetEntries()
return len(data), nil
}

View File

@@ -1,73 +0,0 @@
package raft
import (
"io"
"io/ioutil"
"github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto"
"github.com/coreos/etcd/third_party/github.com/coreos/raft/protobuf"
)
// The response returned from a server appending entries to the log.
type AppendEntriesResponse struct {
pb *protobuf.AppendEntriesResponse
peer string
append bool
}
// Creates a new AppendEntries response.
func newAppendEntriesResponse(term uint64, success bool, index uint64, commitIndex uint64) *AppendEntriesResponse {
pb := &protobuf.AppendEntriesResponse{
Term: proto.Uint64(term),
Index: proto.Uint64(index),
Success: proto.Bool(success),
CommitIndex: proto.Uint64(commitIndex),
}
return &AppendEntriesResponse{
pb: pb,
}
}
func (aer *AppendEntriesResponse) Index() uint64 {
return aer.pb.GetIndex()
}
func (aer *AppendEntriesResponse) CommitIndex() uint64 {
return aer.pb.GetCommitIndex()
}
func (aer *AppendEntriesResponse) Term() uint64 {
return aer.pb.GetTerm()
}
func (aer *AppendEntriesResponse) Success() bool {
return aer.pb.GetSuccess()
}
// Encodes the AppendEntriesResponse to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (resp *AppendEntriesResponse) Encode(w io.Writer) (int, error) {
b, err := proto.Marshal(resp.pb)
if err != nil {
return -1, err
}
return w.Write(b)
}
// Decodes the AppendEntriesResponse from a buffer. Returns the number of bytes read and
// any error that occurs.
func (resp *AppendEntriesResponse) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)
if err != nil {
return -1, err
}
resp.pb = new(protobuf.AppendEntriesResponse)
if err := proto.Unmarshal(data, resp.pb); err != nil {
return -1, err
}
return len(data), nil
}

View File

@@ -1,34 +0,0 @@
package raft
import (
"bytes"
"testing"
)
func BenchmarkAppendEntriesResponseEncoding(b *testing.B) {
req, tmp := createTestAppendEntriesResponse(2000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
var buf bytes.Buffer
req.Encode(&buf)
}
b.SetBytes(int64(len(tmp)))
}
func BenchmarkAppendEntriesResponseDecoding(b *testing.B) {
req, buf := createTestAppendEntriesResponse(2000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
req.Decode(bytes.NewReader(buf))
}
b.SetBytes(int64(len(buf)))
}
func createTestAppendEntriesResponse(entryCount int) (*AppendEntriesResponse, []byte) {
resp := newAppendEntriesResponse(1, true, 1, 1)
var buf bytes.Buffer
resp.Encode(&buf)
return resp, buf.Bytes()
}

View File

@@ -1,28 +0,0 @@
package raft
// Join command interface
type JoinCommand interface {
Command
NodeName() string
}
// Join command
type DefaultJoinCommand struct {
Name string `json:"name"`
ConnectionString string `json:"connectionString"`
}
// The name of the Join command in the log
func (c *DefaultJoinCommand) CommandName() string {
return "raft:join"
}
func (c *DefaultJoinCommand) Apply(server Server) (interface{}, error) {
err := server.AddPeer(c.Name, c.ConnectionString)
return []byte("join"), err
}
func (c *DefaultJoinCommand) NodeName() string {
return c.Name
}

View File

@@ -1,26 +0,0 @@
package raft
// Leave command interface
type LeaveCommand interface {
Command
NodeName() string
}
// Leave command
type DefaultLeaveCommand struct {
Name string `json:"name"`
}
// The name of the Leave command in the log
func (c *DefaultLeaveCommand) CommandName() string {
return "raft:leave"
}
func (c *DefaultLeaveCommand) Apply(server Server) (interface{}, error) {
err := server.RemovePeer(c.Name)
return []byte("leave"), err
}
func (c *DefaultLeaveCommand) NodeName() string {
return c.Name
}

View File

@@ -1,26 +0,0 @@
package raft
import (
"io"
)
// NOP command
type NOPCommand struct {
}
// The name of the NOP command in the log
func (c NOPCommand) CommandName() string {
return "raft:nop"
}
func (c NOPCommand) Apply(server Server) (interface{}, error) {
return nil, nil
}
func (c NOPCommand) Encode(w io.Writer) error {
return nil
}
func (c NOPCommand) Decode(r io.Reader) error {
return nil
}

View File

@@ -1,69 +0,0 @@
package raft
import (
"io"
"io/ioutil"
"github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto"
"github.com/coreos/etcd/third_party/github.com/coreos/raft/protobuf"
)
// The request sent to a server to vote for a candidate to become a leader.
type RequestVoteRequest struct {
peer *Peer
Term uint64
LastLogIndex uint64
LastLogTerm uint64
CandidateName string
}
// Creates a new RequestVote request.
func newRequestVoteRequest(term uint64, candidateName string, lastLogIndex uint64, lastLogTerm uint64) *RequestVoteRequest {
return &RequestVoteRequest{
Term: term,
LastLogIndex: lastLogIndex,
LastLogTerm: lastLogTerm,
CandidateName: candidateName,
}
}
// Encodes the RequestVoteRequest to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (req *RequestVoteRequest) Encode(w io.Writer) (int, error) {
pb := &protobuf.RequestVoteRequest{
Term: proto.Uint64(req.Term),
LastLogIndex: proto.Uint64(req.LastLogIndex),
LastLogTerm: proto.Uint64(req.LastLogTerm),
CandidateName: proto.String(req.CandidateName),
}
p, err := proto.Marshal(pb)
if err != nil {
return -1, err
}
return w.Write(p)
}
// Decodes the RequestVoteRequest from a buffer. Returns the number of bytes read and
// any error that occurs.
func (req *RequestVoteRequest) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)
if err != nil {
return -1, err
}
totalBytes := len(data)
pb := &protobuf.RequestVoteRequest{}
if err = proto.Unmarshal(data, pb); err != nil {
return -1, err
}
req.Term = pb.GetTerm()
req.LastLogIndex = pb.GetLastLogIndex()
req.LastLogTerm = pb.GetLastLogTerm()
req.CandidateName = pb.GetCandidateName()
return totalBytes, nil
}

View File

@@ -1,62 +0,0 @@
package raft
import (
"io"
"io/ioutil"
"github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto"
"github.com/coreos/etcd/third_party/github.com/coreos/raft/protobuf"
)
// The response returned from a server after a vote for a candidate to become a leader.
type RequestVoteResponse struct {
peer *Peer
Term uint64
VoteGranted bool
}
// Creates a new RequestVote response.
func newRequestVoteResponse(term uint64, voteGranted bool) *RequestVoteResponse {
return &RequestVoteResponse{
Term: term,
VoteGranted: voteGranted,
}
}
// Encodes the RequestVoteResponse to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (resp *RequestVoteResponse) Encode(w io.Writer) (int, error) {
pb := &protobuf.RequestVoteResponse{
Term: proto.Uint64(resp.Term),
VoteGranted: proto.Bool(resp.VoteGranted),
}
p, err := proto.Marshal(pb)
if err != nil {
return -1, err
}
return w.Write(p)
}
// Decodes the RequestVoteResponse from a buffer. Returns the number of bytes read and
// any error that occurs.
func (resp *RequestVoteResponse) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)
if err != nil {
return 0, err
}
totalBytes := len(data)
pb := &protobuf.RequestVoteResponse{}
if err = proto.Unmarshal(data, pb); err != nil {
return -1, err
}
resp.Term = pb.GetTerm()
resp.VoteGranted = pb.GetVoteGranted()
return totalBytes, nil
}

View File

@@ -1,61 +0,0 @@
package raft
import (
"encoding/json"
"fmt"
"hash/crc32"
"os"
)
// Snapshot represents an in-memory representation of the current state of the system.
type Snapshot struct {
LastIndex uint64 `json:"lastIndex"`
LastTerm uint64 `json:"lastTerm"`
// Cluster configuration.
Peers []*Peer `json:"peers"`
State []byte `json:"state"`
Path string `json:"path"`
}
// save writes the snapshot to file.
func (ss *Snapshot) save() error {
// Open the file for writing.
file, err := os.OpenFile(ss.Path, os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return err
}
defer file.Close()
// Serialize to JSON.
b, err := json.Marshal(ss)
if err != nil {
return err
}
// Generate checksum and write it to disk.
checksum := crc32.ChecksumIEEE(b)
if _, err = fmt.Fprintf(file, "%08x\n", checksum); err != nil {
return err
}
// Write the snapshot to disk.
if _, err = file.Write(b); err != nil {
return err
}
// Ensure that the snapshot has been flushed to disk before continuing.
if err := file.Sync(); err != nil {
return err
}
return nil
}
// remove deletes the snapshot file.
func (ss *Snapshot) remove() error {
if err := os.Remove(ss.Path); err != nil {
return err
}
return nil
}

View File

@@ -1,90 +0,0 @@
package raft
import (
"io"
"io/ioutil"
"github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto"
"github.com/coreos/etcd/third_party/github.com/coreos/raft/protobuf"
)
// The request sent to a server to start from the snapshot.
type SnapshotRecoveryRequest struct {
LeaderName string
LastIndex uint64
LastTerm uint64
Peers []*Peer
State []byte
}
// Creates a new Snapshot request.
func newSnapshotRecoveryRequest(leaderName string, snapshot *Snapshot) *SnapshotRecoveryRequest {
return &SnapshotRecoveryRequest{
LeaderName: leaderName,
LastIndex: snapshot.LastIndex,
LastTerm: snapshot.LastTerm,
Peers: snapshot.Peers,
State: snapshot.State,
}
}
// Encodes the SnapshotRecoveryRequest to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (req *SnapshotRecoveryRequest) Encode(w io.Writer) (int, error) {
protoPeers := make([]*protobuf.SnapshotRecoveryRequest_Peer, len(req.Peers))
for i, peer := range req.Peers {
protoPeers[i] = &protobuf.SnapshotRecoveryRequest_Peer{
Name: proto.String(peer.Name),
ConnectionString: proto.String(peer.ConnectionString),
}
}
pb := &protobuf.SnapshotRecoveryRequest{
LeaderName: proto.String(req.LeaderName),
LastIndex: proto.Uint64(req.LastIndex),
LastTerm: proto.Uint64(req.LastTerm),
Peers: protoPeers,
State: req.State,
}
p, err := proto.Marshal(pb)
if err != nil {
return -1, err
}
return w.Write(p)
}
// Decodes the SnapshotRecoveryRequest from a buffer. Returns the number of bytes read and
// any error that occurs.
func (req *SnapshotRecoveryRequest) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)
if err != nil {
return 0, err
}
totalBytes := len(data)
pb := &protobuf.SnapshotRecoveryRequest{}
if err = proto.Unmarshal(data, pb); err != nil {
return -1, err
}
req.LeaderName = pb.GetLeaderName()
req.LastIndex = pb.GetLastIndex()
req.LastTerm = pb.GetLastTerm()
req.State = pb.GetState()
req.Peers = make([]*Peer, len(pb.Peers))
for i, peer := range pb.Peers {
req.Peers[i] = &Peer{
Name: peer.GetName(),
ConnectionString: peer.GetConnectionString(),
}
}
return totalBytes, nil
}

View File

@@ -1,63 +0,0 @@
package raft
import (
"io"
"io/ioutil"
"github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto"
"github.com/coreos/etcd/third_party/github.com/coreos/raft/protobuf"
)
// The response returned from a server appending entries to the log.
type SnapshotRecoveryResponse struct {
Term uint64
Success bool
CommitIndex uint64
}
// Creates a new Snapshot response.
func newSnapshotRecoveryResponse(term uint64, success bool, commitIndex uint64) *SnapshotRecoveryResponse {
return &SnapshotRecoveryResponse{
Term: term,
Success: success,
CommitIndex: commitIndex,
}
}
// Encode writes the response to a writer.
// Returns the number of bytes written and any error that occurs.
func (req *SnapshotRecoveryResponse) Encode(w io.Writer) (int, error) {
pb := &protobuf.SnapshotRecoveryResponse{
Term: proto.Uint64(req.Term),
Success: proto.Bool(req.Success),
CommitIndex: proto.Uint64(req.CommitIndex),
}
p, err := proto.Marshal(pb)
if err != nil {
return -1, err
}
return w.Write(p)
}
// Decodes the SnapshotRecoveryResponse from a buffer.
func (req *SnapshotRecoveryResponse) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)
if err != nil {
return 0, err
}
totalBytes := len(data)
pb := &protobuf.SnapshotRecoveryResponse{}
if err := proto.Unmarshal(data, pb); err != nil {
return -1, err
}
req.Term = pb.GetTerm()
req.Success = pb.GetSuccess()
req.CommitIndex = pb.GetCommitIndex()
return totalBytes, nil
}

View File

@@ -1,65 +0,0 @@
package raft
import (
"io"
"io/ioutil"
"github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto"
"github.com/coreos/etcd/third_party/github.com/coreos/raft/protobuf"
)
// The request sent to a server to start from the snapshot.
type SnapshotRequest struct {
LeaderName string
LastIndex uint64
LastTerm uint64
}
// Creates a new Snapshot request.
func newSnapshotRequest(leaderName string, snapshot *Snapshot) *SnapshotRequest {
return &SnapshotRequest{
LeaderName: leaderName,
LastIndex: snapshot.LastIndex,
LastTerm: snapshot.LastTerm,
}
}
// Encodes the SnapshotRequest to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (req *SnapshotRequest) Encode(w io.Writer) (int, error) {
pb := &protobuf.SnapshotRequest{
LeaderName: proto.String(req.LeaderName),
LastIndex: proto.Uint64(req.LastIndex),
LastTerm: proto.Uint64(req.LastTerm),
}
p, err := proto.Marshal(pb)
if err != nil {
return -1, err
}
return w.Write(p)
}
// Decodes the SnapshotRequest from a buffer. Returns the number of bytes read and
// any error that occurs.
func (req *SnapshotRequest) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)
if err != nil {
return 0, err
}
totalBytes := len(data)
pb := &protobuf.SnapshotRequest{}
if err := proto.Unmarshal(data, pb); err != nil {
return -1, err
}
req.LeaderName = pb.GetLeaderName()
req.LastIndex = pb.GetLastIndex()
req.LastTerm = pb.GetLastTerm()
return totalBytes, nil
}

View File

@@ -1,56 +0,0 @@
package raft
import (
"io"
"io/ioutil"
"github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto"
"github.com/coreos/etcd/third_party/github.com/coreos/raft/protobuf"
)
// The response returned if the follower entered snapshot state
type SnapshotResponse struct {
Success bool `json:"success"`
}
// Creates a new Snapshot response.
func newSnapshotResponse(success bool) *SnapshotResponse {
return &SnapshotResponse{
Success: success,
}
}
// Encodes the SnapshotResponse to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (resp *SnapshotResponse) Encode(w io.Writer) (int, error) {
pb := &protobuf.SnapshotResponse{
Success: proto.Bool(resp.Success),
}
p, err := proto.Marshal(pb)
if err != nil {
return -1, err
}
return w.Write(p)
}
// Decodes the SnapshotResponse from a buffer. Returns the number of bytes read and
// any error that occurs.
func (resp *SnapshotResponse) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)
if err != nil {
return 0, err
}
totalBytes := len(data)
pb := &protobuf.SnapshotResponse{}
if err := proto.Unmarshal(data, pb); err != nil {
return -1, err
}
resp.Success = pb.GetSuccess()
return totalBytes, nil
}

View File

@@ -1,23 +0,0 @@
package raft
//------------------------------------------------------------------------------
//
// Typedefs
//
//------------------------------------------------------------------------------
type uint64Slice []uint64
//------------------------------------------------------------------------------
//
// Functions
//
//------------------------------------------------------------------------------
//--------------------------------------
// uint64
//--------------------------------------
func (p uint64Slice) Len() int { return len(p) }
func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

View File

@@ -1,17 +0,0 @@
package raft
import (
"math/rand"
"time"
)
// Waits for a random time between two durations and sends the current time on
// the returned channel.
func afterBetween(min time.Duration, max time.Duration) <-chan time.Time {
rand := rand.New(rand.NewSource(time.Now().UnixNano()))
d, delta := min, (max - min)
if delta > 0 {
d += time.Duration(rand.Int63n(int64(delta)))
}
return time.After(d)
}

View File

@@ -1,31 +0,0 @@
package raft
import (
"io"
"os"
)
// WriteFile writes data to a file named by filename.
// If the file does not exist, WriteFile creates it with permissions perm;
// otherwise WriteFile truncates it before writing.
// This is copied from ioutil.WriteFile with the addition of a Sync call to
// ensure the data reaches the disk.
func writeFileSynced(filename string, data []byte, perm os.FileMode) error {
f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
if err != nil {
return err
}
n, err := f.Write(data)
if n < len(data) {
f.Close()
return io.ErrShortWrite
}
err = f.Sync()
if err != nil {
return err
}
return f.Close()
}

View File

@@ -36,6 +36,7 @@ These projects are built on go-raft:
- [goraft/raftd](https://github.com/goraft/raftd) - A reference implementation for using the go-raft library for distributed consensus.
- [skynetservices/skydns](https://github.com/skynetservices/skydns) - DNS for skynet or any other service discovery.
- [influxdb/influxdb](https://github.com/influxdb/influxdb) - An open-source, distributed, time series, events, and metrics database.
- [Weed File System](https://weed-fs.googlecode.com) - A scalable distributed key-to-file system with O(1) disk access for each read.
If you have a project that you're using go-raft in, please add it to this README so others can see implementation examples.

View File

@@ -0,0 +1,146 @@
package raft
import (
"io"
"io/ioutil"
"github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto"
"github.com/coreos/etcd/third_party/github.com/goraft/raft/protobuf"
)
// The request sent to a server to append entries to the log.
type AppendEntriesRequest struct {
Term uint64
PrevLogIndex uint64
PrevLogTerm uint64
CommitIndex uint64
LeaderName string
Entries []*protobuf.LogEntry
}
// The response returned from a server appending entries to the log.
type AppendEntriesResponse struct {
pb *protobuf.AppendEntriesResponse
peer string
append bool
}
// Creates a new AppendEntries request.
func newAppendEntriesRequest(term uint64, prevLogIndex uint64, prevLogTerm uint64,
commitIndex uint64, leaderName string, entries []*LogEntry) *AppendEntriesRequest {
pbEntries := make([]*protobuf.LogEntry, len(entries))
for i := range entries {
pbEntries[i] = entries[i].pb
}
return &AppendEntriesRequest{
Term: term,
PrevLogIndex: prevLogIndex,
PrevLogTerm: prevLogTerm,
CommitIndex: commitIndex,
LeaderName: leaderName,
Entries: pbEntries,
}
}
// Encodes the AppendEntriesRequest to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (req *AppendEntriesRequest) Encode(w io.Writer) (int, error) {
pb := &protobuf.AppendEntriesRequest{
Term: proto.Uint64(req.Term),
PrevLogIndex: proto.Uint64(req.PrevLogIndex),
PrevLogTerm: proto.Uint64(req.PrevLogTerm),
CommitIndex: proto.Uint64(req.CommitIndex),
LeaderName: proto.String(req.LeaderName),
Entries: req.Entries,
}
p, err := proto.Marshal(pb)
if err != nil {
return -1, err
}
return w.Write(p)
}
// Decodes the AppendEntriesRequest from a buffer. Returns the number of bytes read and
// any error that occurs.
func (req *AppendEntriesRequest) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)
if err != nil {
return -1, err
}
pb := new(protobuf.AppendEntriesRequest)
if err := proto.Unmarshal(data, pb); err != nil {
return -1, err
}
req.Term = pb.GetTerm()
req.PrevLogIndex = pb.GetPrevLogIndex()
req.PrevLogTerm = pb.GetPrevLogTerm()
req.CommitIndex = pb.GetCommitIndex()
req.LeaderName = pb.GetLeaderName()
req.Entries = pb.GetEntries()
return len(data), nil
}
// Creates a new AppendEntries response.
func newAppendEntriesResponse(term uint64, success bool, index uint64, commitIndex uint64) *AppendEntriesResponse {
pb := &protobuf.AppendEntriesResponse{
Term: proto.Uint64(term),
Index: proto.Uint64(index),
Success: proto.Bool(success),
CommitIndex: proto.Uint64(commitIndex),
}
return &AppendEntriesResponse{
pb: pb,
}
}
func (aer *AppendEntriesResponse) Index() uint64 {
return aer.pb.GetIndex()
}
func (aer *AppendEntriesResponse) CommitIndex() uint64 {
return aer.pb.GetCommitIndex()
}
func (aer *AppendEntriesResponse) Term() uint64 {
return aer.pb.GetTerm()
}
func (aer *AppendEntriesResponse) Success() bool {
return aer.pb.GetSuccess()
}
// Encodes the AppendEntriesResponse to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (resp *AppendEntriesResponse) Encode(w io.Writer) (int, error) {
b, err := proto.Marshal(resp.pb)
if err != nil {
return -1, err
}
return w.Write(b)
}
// Decodes the AppendEntriesResponse from a buffer. Returns the number of bytes read and
// any error that occurs.
func (resp *AppendEntriesResponse) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)
if err != nil {
return -1, err
}
resp.pb = new(protobuf.AppendEntriesResponse)
if err := proto.Unmarshal(data, resp.pb); err != nil {
return -1, err
}
return len(data), nil
}

View File

@@ -24,6 +24,25 @@ func BenchmarkAppendEntriesRequestDecoding(b *testing.B) {
b.SetBytes(int64(len(buf)))
}
func BenchmarkAppendEntriesResponseEncoding(b *testing.B) {
req, tmp := createTestAppendEntriesResponse(2000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
var buf bytes.Buffer
req.Encode(&buf)
}
b.SetBytes(int64(len(tmp)))
}
func BenchmarkAppendEntriesResponseDecoding(b *testing.B) {
req, buf := createTestAppendEntriesResponse(2000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
req.Decode(bytes.NewReader(buf))
}
b.SetBytes(int64(len(buf)))
}
func createTestAppendEntriesRequest(entryCount int) (*AppendEntriesRequest, []byte) {
entries := make([]*LogEntry, 0)
for i := 0; i < entryCount; i++ {
@@ -38,3 +57,12 @@ func createTestAppendEntriesRequest(entryCount int) (*AppendEntriesRequest, []by
return req, buf.Bytes()
}
func createTestAppendEntriesResponse(entryCount int) (*AppendEntriesResponse, []byte) {
resp := newAppendEntriesResponse(1, true, 1, 1)
var buf bytes.Buffer
resp.Encode(&buf)
return resp, buf.Bytes()
}

View File

@@ -0,0 +1,78 @@
package raft
import (
"io"
)
// Join command interface
type JoinCommand interface {
Command
NodeName() string
}
// Join command
type DefaultJoinCommand struct {
Name string `json:"name"`
ConnectionString string `json:"connectionString"`
}
// Leave command interface
type LeaveCommand interface {
Command
NodeName() string
}
// Leave command
type DefaultLeaveCommand struct {
Name string `json:"name"`
}
// NOP command
type NOPCommand struct {
}
// The name of the Join command in the log
func (c *DefaultJoinCommand) CommandName() string {
return "raft:join"
}
func (c *DefaultJoinCommand) Apply(server Server) (interface{}, error) {
err := server.AddPeer(c.Name, c.ConnectionString)
return []byte("join"), err
}
func (c *DefaultJoinCommand) NodeName() string {
return c.Name
}
// The name of the Leave command in the log
func (c *DefaultLeaveCommand) CommandName() string {
return "raft:leave"
}
func (c *DefaultLeaveCommand) Apply(server Server) (interface{}, error) {
err := server.RemovePeer(c.Name)
return []byte("leave"), err
}
func (c *DefaultLeaveCommand) NodeName() string {
return c.Name
}
// The name of the NOP command in the log
func (c NOPCommand) CommandName() string {
return "raft:nop"
}
func (c NOPCommand) Apply(server Server) (interface{}, error) {
return nil, nil
}
func (c NOPCommand) Encode(w io.Writer) error {
return nil
}
func (c NOPCommand) Decode(r io.Reader) error {
return nil
}

View File

@@ -8,7 +8,7 @@ import (
"os"
"sync"
"github.com/coreos/etcd/third_party/github.com/coreos/raft/protobuf"
"github.com/coreos/etcd/third_party/github.com/goraft/raft/protobuf"
)
//------------------------------------------------------------------------------

View File

@@ -7,7 +7,7 @@ import (
"io"
"github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto"
"github.com/coreos/etcd/third_party/github.com/coreos/raft/protobuf"
"github.com/coreos/etcd/third_party/github.com/goraft/raft/protobuf"
)
// A log entry stores a single item in the log.
@@ -91,7 +91,7 @@ func (e *LogEntry) Decode(r io.Reader) (int, error) {
}
data := make([]byte, length)
_, err = r.Read(data)
_, err = io.ReadFull(r, data)
if err != nil {
return -1, err

View File

@@ -68,11 +68,6 @@ func (p *Peer) setPrevLogIndex(value uint64) {
p.prevLogIndex = value
}
// LastActivity returns the last time any response was received from the peer.
func (p *Peer) LastActivity() time.Time {
return p.lastActivity
}
//------------------------------------------------------------------------------
//
// Methods
@@ -96,6 +91,11 @@ func (p *Peer) stopHeartbeat(flush bool) {
p.stopChan <- flush
}
// LastActivity returns the last time any response was received from the peer.
func (p *Peer) LastActivity() time.Time {
return p.lastActivity
}
//--------------------------------------
// Copying
//--------------------------------------
@@ -160,7 +160,7 @@ func (p *Peer) flush() {
if entries != nil {
p.sendAppendEntriesRequest(newAppendEntriesRequest(term, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries))
} else {
p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.lastSnapshot))
p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.snapshot))
}
}
@@ -263,7 +263,7 @@ func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) {
// Sends an Snapshot Recovery request to the peer through the transport.
func (p *Peer) sendSnapshotRecoveryRequest() {
req := newSnapshotRecoveryRequest(p.server.name, p.server.lastSnapshot)
req := newSnapshotRecoveryRequest(p.server.name, p.server.snapshot)
debugln("peer.snap.recovery.send: ", p.Name)
resp := p.server.Transporter().SendSnapshotRecoveryRequest(p.server, p, req)

View File

@@ -0,0 +1,122 @@
package raft
import (
"io"
"io/ioutil"
"github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto"
"github.com/coreos/etcd/third_party/github.com/goraft/raft/protobuf"
)
// The request sent to a server to vote for a candidate to become a leader.
type RequestVoteRequest struct {
peer *Peer
Term uint64
LastLogIndex uint64
LastLogTerm uint64
CandidateName string
}
// The response returned from a server after a vote for a candidate to become a leader.
type RequestVoteResponse struct {
peer *Peer
Term uint64
VoteGranted bool
}
// Creates a new RequestVote request.
func newRequestVoteRequest(term uint64, candidateName string, lastLogIndex uint64, lastLogTerm uint64) *RequestVoteRequest {
return &RequestVoteRequest{
Term: term,
LastLogIndex: lastLogIndex,
LastLogTerm: lastLogTerm,
CandidateName: candidateName,
}
}
// Encodes the RequestVoteRequest to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (req *RequestVoteRequest) Encode(w io.Writer) (int, error) {
pb := &protobuf.RequestVoteRequest{
Term: proto.Uint64(req.Term),
LastLogIndex: proto.Uint64(req.LastLogIndex),
LastLogTerm: proto.Uint64(req.LastLogTerm),
CandidateName: proto.String(req.CandidateName),
}
p, err := proto.Marshal(pb)
if err != nil {
return -1, err
}
return w.Write(p)
}
// Decodes the RequestVoteRequest from a buffer. Returns the number of bytes read and
// any error that occurs.
func (req *RequestVoteRequest) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)
if err != nil {
return -1, err
}
totalBytes := len(data)
pb := &protobuf.RequestVoteRequest{}
if err = proto.Unmarshal(data, pb); err != nil {
return -1, err
}
req.Term = pb.GetTerm()
req.LastLogIndex = pb.GetLastLogIndex()
req.LastLogTerm = pb.GetLastLogTerm()
req.CandidateName = pb.GetCandidateName()
return totalBytes, nil
}
// Creates a new RequestVote response.
func newRequestVoteResponse(term uint64, voteGranted bool) *RequestVoteResponse {
return &RequestVoteResponse{
Term: term,
VoteGranted: voteGranted,
}
}
// Encodes the RequestVoteResponse to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (resp *RequestVoteResponse) Encode(w io.Writer) (int, error) {
pb := &protobuf.RequestVoteResponse{
Term: proto.Uint64(resp.Term),
VoteGranted: proto.Bool(resp.VoteGranted),
}
p, err := proto.Marshal(pb)
if err != nil {
return -1, err
}
return w.Write(p)
}
// Decodes the RequestVoteResponse from a buffer. Returns the number of bytes read and
// any error that occurs.
func (resp *RequestVoteResponse) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)
if err != nil {
return 0, err
}
totalBytes := len(data)
pb := &protobuf.RequestVoteResponse{}
if err = proto.Unmarshal(data, pb); err != nil {
return -1, err
}
resp.Term = pb.GetTerm()
resp.VoteGranted = pb.GetVoteGranted()
return totalBytes, nil
}

View File

@@ -21,6 +21,7 @@ import (
const (
Stopped = "stopped"
Initialized = "initialized"
Follower = "follower"
Candidate = "candidate"
Leader = "leader"
@@ -45,8 +46,6 @@ const (
// election timeout.
const ElectionTimeoutThresholdPercent = 0.8
var stopValue interface{}
//------------------------------------------------------------------------------
//
// Errors
@@ -96,6 +95,7 @@ type Server interface {
AddPeer(name string, connectiongString string) error
RemovePeer(name string) error
Peers() map[string]*Peer
Init() error
Start() error
Stop()
Running() bool
@@ -103,6 +103,7 @@ type Server interface {
TakeSnapshot() error
LoadSnapshot() error
AddEventListener(string, EventListener)
FlushCommitIndex()
}
type server struct {
@@ -122,13 +123,19 @@ type server struct {
mutex sync.RWMutex
syncedPeer map[string]bool
stopped chan bool
stopped chan chan bool
c chan *ev
electionTimeout time.Duration
heartbeatInterval time.Duration
currentSnapshot *Snapshot
lastSnapshot *Snapshot
snapshot *Snapshot
// PendingSnapshot is an unfinished snapshot.
// After the pendingSnapshot is saved to disk,
// it will be set to snapshot and also will be
// set to nil.
pendingSnapshot *Snapshot
stateMachine StateMachine
maxLogEntriesPerRequest uint64
@@ -170,7 +177,7 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S
state: Stopped,
peers: make(map[string]*Peer),
log: newLog(),
stopped: make(chan bool),
stopped: make(chan chan bool),
c: make(chan *ev, 256),
electionTimeout: DefaultElectionTimeout,
heartbeatInterval: DefaultHeartbeatInterval,
@@ -292,9 +299,8 @@ func (s *server) setState(state string) {
}
// Dispatch state and leader change events.
if prevState != state {
s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
}
if prevLeader != s.leader {
s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
}
@@ -419,35 +425,21 @@ func init() {
RegisterCommand(&DefaultLeaveCommand{})
}
// Start as follow
// Start the raft server
// If log entries exist then allow promotion to candidate if no AEs received.
// If no log entries exist then wait for AEs from another node.
// If no log entries exist and a self-join command is issued then
// immediately become leader and commit entry.
func (s *server) Start() error {
// Exit if the server is already running.
if s.State() != Stopped {
return errors.New("raft.Server: Server already running")
if s.Running() {
return fmt.Errorf("raft.Server: Server already running[%v]", s.state)
}
// Create snapshot directory if not exist
os.Mkdir(path.Join(s.path, "snapshot"), 0700)
if err := s.readConf(); err != nil {
s.debugln("raft: Conf file error: ", err)
return fmt.Errorf("raft: Initialization error: %s", err)
if err := s.Init(); err != nil {
return err
}
// Initialize the log and load it up.
if err := s.log.open(s.LogPath()); err != nil {
s.debugln("raft: Log error: ", err)
return fmt.Errorf("raft: Initialization error: %s", err)
}
// Update the term to the last term in the log.
_, s.currentTerm = s.log.lastInfo()
s.setState(Follower)
// If no log entries exist then
@@ -470,39 +462,78 @@ func (s *server) Start() error {
return nil
}
// Init initializes the raft server
func (s *server) Init() error {
if s.Running() {
return fmt.Errorf("raft.Server: Server already running[%v]", s.state)
}
// server has been initialized or server was stopped after initialized
if s.state == Initialized || !s.log.isEmpty() {
s.state = Initialized
return nil
}
// Create snapshot directory if it does not exist
err := os.Mkdir(path.Join(s.path, "snapshot"), 0700)
if err != nil && !os.IsExist(err) {
s.debugln("raft: Snapshot dir error: ", err)
return fmt.Errorf("raft: Initialization error: %s", err)
}
if err := s.readConf(); err != nil {
s.debugln("raft: Conf file error: ", err)
return fmt.Errorf("raft: Initialization error: %s", err)
}
// Initialize the log and load it up.
if err := s.log.open(s.LogPath()); err != nil {
s.debugln("raft: Log error: ", err)
return fmt.Errorf("raft: Initialization error: %s", err)
}
// Update the term to the last term in the log.
_, s.currentTerm = s.log.lastInfo()
s.state = Initialized
return nil
}
// Shuts down the server.
func (s *server) Stop() {
s.send(&stopValue)
stop := make(chan bool)
s.stopped <- stop
s.state = Stopped
// make sure the server has stopped before we close the log
<-s.stopped
<-stop
s.log.close()
s.state = Stopped
}
// Checks if the server is currently running.
func (s *server) Running() bool {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.state != Stopped
return (s.state != Stopped && s.state != Initialized)
}
//--------------------------------------
// Term
//--------------------------------------
// Sets the current term for the server. This is only used when an external
// current term is found.
func (s *server) setCurrentTerm(term uint64, leaderName string, append bool) {
// updates the current term for the server. This is only used when a larger
// external term is found.
func (s *server) updateCurrentTerm(term uint64, leaderName string) {
_assert(term > s.currentTerm,
"upadteCurrentTerm: update is called when term is not larger than currentTerm")
s.mutex.Lock()
defer s.mutex.Unlock()
// Store previous values temporarily.
prevState := s.state
prevTerm := s.currentTerm
prevLeader := s.leader
if term > s.currentTerm {
// set currentTerm = T, convert to follower (§5.1)
// stop heartbeats before step-down
if s.state == Leader {
s.mutex.Unlock()
@@ -512,27 +543,21 @@ func (s *server) setCurrentTerm(term uint64, leaderName string, append bool) {
s.mutex.Lock()
}
// update the term and clear vote for
s.state = Follower
if s.state != Follower {
s.mutex.Unlock()
s.setState(Follower)
s.mutex.Lock()
}
s.currentTerm = term
s.leader = leaderName
s.votedFor = ""
} else if term == s.currentTerm && s.state != Leader && append {
// discover new leader when candidate
// save leader name when follower
s.state = Follower
s.leader = leaderName
}
// Dispatch change events.
if prevState != s.state {
s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
}
s.DispatchEvent(newEvent(TermChangeEventType, s.currentTerm, prevTerm))
if prevLeader != s.leader {
s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
}
if prevTerm != s.currentTerm {
s.DispatchEvent(newEvent(TermChangeEventType, s.currentTerm, prevTerm))
}
}
//--------------------------------------
@@ -555,26 +580,19 @@ func (s *server) setCurrentTerm(term uint64, leaderName string, append bool) {
func (s *server) loop() {
defer s.debugln("server.loop.end")
for {
for s.state != Stopped {
state := s.State()
s.debugln("server.loop.run ", state)
switch state {
case Follower:
s.followerLoop()
case Candidate:
s.candidateLoop()
case Leader:
s.leaderLoop()
case Snapshotting:
s.snapshotLoop()
case Stopped:
s.stopped <- true
return
}
}
}
@@ -610,7 +628,6 @@ func (s *server) sendAsync(value interface{}) {
// 1.Receiving valid AppendEntries RPC, or
// 2.Granting vote to candidate
func (s *server) followerLoop() {
s.setState(Follower)
since := time.Now()
electionTimeout := s.ElectionTimeout()
timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
@@ -619,10 +636,12 @@ func (s *server) followerLoop() {
var err error
update := false
select {
case e := <-s.c:
if e.target == &stopValue {
case stop := <-s.stopped:
s.setState(Stopped)
} else {
stop <- true
return
case e := <-s.c:
switch req := e.target.(type) {
case JoinCommand:
//If no log entries exist and a self-join command is issued
@@ -648,13 +667,10 @@ func (s *server) followerLoop() {
default:
err = NotLeaderError
}
}
// Callback to event.
e.c <- err
case <-timeoutChan:
// only allow synced follower to promote to candidate
if s.promotable() {
s.setState(Candidate)
@@ -675,8 +691,6 @@ func (s *server) followerLoop() {
// The event loop that is run when the server is in a Candidate state.
func (s *server) candidateLoop() {
lastLogIndex, lastLogTerm := s.log.lastInfo()
// Clear leader value.
prevLeader := s.leader
s.leader = ""
@@ -684,13 +698,20 @@ func (s *server) candidateLoop() {
s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
}
lastLogIndex, lastLogTerm := s.log.lastInfo()
doVote := true
votesGranted := 0
var timeoutChan <-chan time.Time
var respChan chan *RequestVoteResponse
for s.State() == Candidate {
if doVote {
// Increment current term, vote for self.
s.currentTerm++
s.votedFor = s.name
// Send RequestVote RPCs to all other servers.
respChan := make(chan *RequestVoteResponse, len(s.peers))
respChan = make(chan *RequestVoteResponse, len(s.peers))
for _, peer := range s.peers {
go peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)
}
@@ -700,36 +721,34 @@ func (s *server) candidateLoop() {
// * AppendEntries RPC received from new leader: step down.
// * Election timeout elapses without election resolution: increment term, start new election
// * Discover higher term: step down (§5.1)
votesGranted := 1
timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
timeout := false
votesGranted = 1
timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
doVote = false
}
for {
// If we received enough votes then stop waiting for more votes.
s.debugln("server.candidate.votes: ", votesGranted, " quorum:", s.QuorumSize())
if votesGranted >= s.QuorumSize() {
// And return from the candidate loop
if votesGranted == s.QuorumSize() {
s.debugln("server.candidate.recv.enough.votes")
s.setState(Leader)
break
return
}
// Collect votes from peers.
select {
case stop := <-s.stopped:
s.setState(Stopped)
stop <- true
return
case resp := <-respChan:
if resp.VoteGranted {
if success := s.processVoteResponse(resp); success {
s.debugln("server.candidate.vote.granted: ", votesGranted)
votesGranted++
} else if resp.Term > s.currentTerm {
s.debugln("server.candidate.vote.failed")
s.setCurrentTerm(resp.Term, "", false)
} else {
s.debugln("server.candidate.vote: denied")
}
case e := <-s.c:
var err error
if e.target == &stopValue {
s.setState(Stopped)
} else {
switch req := e.target.(type) {
case Command:
err = NotLeaderError
@@ -738,27 +757,18 @@ func (s *server) candidateLoop() {
case *RequestVoteRequest:
e.returnValue, _ = s.processRequestVoteRequest(req)
}
}
// Callback to event.
e.c <- err
case <-timeoutChan:
timeout = true
doVote = true
}
// both process AER and RVR can make the server to follower
// also break when timeout happens
if s.State() != Candidate || timeout {
break
}
}
// continue when timeout happened
}
}
// The event loop that is run when the server is in a Leader state.
func (s *server) leaderLoop() {
s.setState(Leader)
logIndex, _ := s.log.lastInfo()
// Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
@@ -778,14 +788,16 @@ func (s *server) leaderLoop() {
for s.State() == Leader {
var err error
select {
case e := <-s.c:
if e.target == &stopValue {
case stop := <-s.stopped:
// Stop all peers before stop
for _, peer := range s.peers {
peer.stopHeartbeat(false)
}
s.setState(Stopped)
} else {
stop <- true
return
case e := <-s.c:
switch req := e.target.(type) {
case Command:
s.processCommand(req, e)
@@ -797,7 +809,6 @@ func (s *server) leaderLoop() {
case *RequestVoteRequest:
e.returnValue, _ = s.processRequestVoteRequest(req)
}
}
// Callback to event.
e.c <- err
@@ -808,16 +819,15 @@ func (s *server) leaderLoop() {
}
func (s *server) snapshotLoop() {
s.setState(Snapshotting)
for s.State() == Snapshotting {
var err error
e := <-s.c
if e.target == &stopValue {
select {
case stop := <-s.stopped:
s.setState(Stopped)
} else {
stop <- true
return
case e := <-s.c:
switch req := e.target.(type) {
case Command:
err = NotLeaderError
@@ -828,11 +838,11 @@ func (s *server) snapshotLoop() {
case *SnapshotRecoveryRequest:
e.returnValue = s.processSnapshotRecoveryRequest(req)
}
}
// Callback to event.
e.c <- err
}
}
}
//--------------------------------------
// Commands
@@ -892,8 +902,17 @@ func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*Append
return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), false
}
if req.Term == s.currentTerm {
_assert(s.state != Leader, "leader.elected.at.same.term.%d\n", s.currentTerm)
// change state to follower
s.state = Follower
// discover new leader when candidate
// save leader name when follower
s.leader = req.LeaderName
} else {
// Update term and leader.
s.setCurrentTerm(req.Term, req.LeaderName, true)
s.updateCurrentTerm(req.Term, req.LeaderName)
}
// Reject if log doesn't contain a matching previous entry.
if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
@@ -924,7 +943,7 @@ func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*Append
func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
// If we find a higher term then change to a follower and exit.
if resp.Term() > s.Term() {
s.setCurrentTerm(resp.Term(), "", false)
s.updateCurrentTerm(resp.Term(), "")
return
}
@@ -964,6 +983,25 @@ func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
}
}
// processVoteReponse processes a vote request:
// 1. if the vote is granted for the current term of the candidate, return true
// 2. if the vote is denied due to smaller term, update the term of this server
// which will also cause the candidate to step-down, and return false.
// 3. if the vote is for a smaller term, ignore it and return false.
func (s *server) processVoteResponse(resp *RequestVoteResponse) bool {
if resp.VoteGranted && resp.Term == s.currentTerm {
return true
}
if resp.Term > s.currentTerm {
s.debugln("server.candidate.vote.failed")
s.updateCurrentTerm(resp.Term, "")
} else {
s.debugln("server.candidate.vote: denied")
}
return false
}
//--------------------------------------
// Request Vote
//--------------------------------------
@@ -986,10 +1024,12 @@ func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot
return newRequestVoteResponse(s.currentTerm, false), false
}
s.setCurrentTerm(req.Term, "", false)
// If we've already voted for a different candidate then don't vote for this candidate.
if s.votedFor != "" && s.votedFor != req.CandidateName {
// If the term of the request peer is larger than this node, update the term
// If the term is equal and we've already voted for a different candidate then
// don't vote for this candidate.
if req.Term > s.Term() {
s.updateCurrentTerm(req.Term, "")
} else if s.votedFor != "" && s.votedFor != req.CandidateName {
s.debugln("server.deny.vote: cause duplicate vote: ", req.CandidateName,
" already vote for ", s.votedFor)
return newRequestVoteResponse(s.currentTerm, false), false
@@ -1057,7 +1097,15 @@ func (s *server) RemovePeer(name string) error {
// Stop peer and remove it.
if s.State() == Leader {
peer.stopHeartbeat(true)
// We create a go routine here to avoid potential deadlock.
// We are holding log write lock when reach this line of code.
// Peer.stopHeartbeat can be blocked without go routine, if the
// target go routine (which we want to stop) is calling
// log.getEntriesAfter and waiting for log read lock.
// So we might be holding log lock and waiting for log lock,
// which lead to a deadlock.
// TODO(xiangli) refactor log lock
go peer.stopHeartbeat(true)
}
delete(s.peers, name)
@@ -1076,31 +1124,36 @@ func (s *server) RemovePeer(name string) error {
//--------------------------------------
func (s *server) TakeSnapshot() error {
// TODO: put a snapshot mutex
s.debugln("take Snapshot")
if s.stateMachine == nil {
return errors.New("Snapshot: Cannot create snapshot. Missing state machine.")
}
// Shortcut without lock
// Exit if the server is currently creating a snapshot.
if s.currentSnapshot != nil {
return errors.New("handling snapshot")
if s.pendingSnapshot != nil {
return errors.New("Snapshot: Last snapshot is not finished.")
}
// Exit if there are no logs yet in the system.
// TODO: acquire the lock and no more committed is allowed
// This will be done after finishing refactoring heartbeat
s.debugln("take.snapshot")
lastIndex, lastTerm := s.log.commitInfo()
path := s.SnapshotPath(lastIndex, lastTerm)
if lastIndex == 0 {
return errors.New("No logs")
// check if there is log has been committed since the
// last snapshot.
if lastIndex == s.log.startIndex {
return nil
}
var state []byte
var err error
if s.stateMachine != nil {
state, err = s.stateMachine.Save()
path := s.SnapshotPath(lastIndex, lastTerm)
// Attach snapshot to pending snapshot and save it to disk.
s.pendingSnapshot = &Snapshot{lastIndex, lastTerm, nil, nil, path}
state, err := s.stateMachine.Save()
if err != nil {
return err
}
} else {
state = []byte{0}
}
// Clone the list of peers.
peers := make([]*Peer, 0, len(s.peers)+1)
@@ -1109,8 +1162,9 @@ func (s *server) TakeSnapshot() error {
}
peers = append(peers, &Peer{Name: s.Name(), ConnectionString: s.connectionString})
// Attach current snapshot and save it to disk.
s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peers, state, path}
// Attach snapshot to pending snapshot and save it to disk.
s.pendingSnapshot.Peers = peers
s.pendingSnapshot.State = state
s.saveSnapshot()
// We keep some log entries after the snapshot.
@@ -1126,24 +1180,24 @@ func (s *server) TakeSnapshot() error {
// Retrieves the log path for the server.
func (s *server) saveSnapshot() error {
if s.currentSnapshot == nil {
return errors.New("no snapshot to save")
if s.pendingSnapshot == nil {
return errors.New("pendingSnapshot.is.nil")
}
// Write snapshot to disk.
if err := s.currentSnapshot.save(); err != nil {
if err := s.pendingSnapshot.save(); err != nil {
return err
}
// Swap the current and last snapshots.
tmp := s.lastSnapshot
s.lastSnapshot = s.currentSnapshot
tmp := s.snapshot
s.snapshot = s.pendingSnapshot
// Delete the previous snapshot if there is any change
if tmp != nil && !(tmp.LastIndex == s.lastSnapshot.LastIndex && tmp.LastTerm == s.lastSnapshot.LastTerm) {
if tmp != nil && !(tmp.LastIndex == s.snapshot.LastIndex && tmp.LastTerm == s.snapshot.LastTerm) {
tmp.remove()
}
s.currentSnapshot = nil
s.pendingSnapshot = nil
return nil
}
@@ -1184,7 +1238,7 @@ func (s *server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *Snapshot
func (s *server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
// Recover state sent from request.
if err := s.stateMachine.Recovery(req.State); err != nil {
return newSnapshotRecoveryResponse(req.LastTerm, false, req.LastIndex)
panic("cannot recover from previous state")
}
// Recover the cluster configuration.
@@ -1198,14 +1252,13 @@ func (s *server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *S
s.log.updateCommitIndex(req.LastIndex)
// Create local snapshot.
s.currentSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.Peers, req.State, s.SnapshotPath(req.LastIndex, req.LastTerm)}
s.pendingSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.Peers, req.State, s.SnapshotPath(req.LastIndex, req.LastTerm)}
s.saveSnapshot()
// Clear the previous log entries.
s.log.compact(req.LastIndex, req.LastTerm)
return newSnapshotRecoveryResponse(req.LastTerm, true, req.LastIndex)
}
// Load a snapshot at restart
@@ -1213,6 +1266,7 @@ func (s *server) LoadSnapshot() error {
// Open snapshot/ directory.
dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
if err != nil {
s.debugln("cannot.open.snapshot: ", err)
return err
}
@@ -1225,7 +1279,8 @@ func (s *server) LoadSnapshot() error {
dir.Close()
if len(filenames) == 0 {
return errors.New("no snapshot")
s.debugln("no.snapshot.to.load")
return nil
}
// Grab the latest snapshot.
@@ -1245,7 +1300,7 @@ func (s *server) LoadSnapshot() error {
if err != nil {
return err
} else if n != 1 {
return errors.New("Bad snapshot file")
return errors.New("checksum.err: bad.snapshot.file")
}
// Load remaining snapshot contents.
@@ -1262,26 +1317,26 @@ func (s *server) LoadSnapshot() error {
}
// Decode snapshot.
if err = json.Unmarshal(b, &s.lastSnapshot); err != nil {
s.debugln("unmarshal error: ", err)
if err = json.Unmarshal(b, &s.snapshot); err != nil {
s.debugln("unmarshal.snapshot.error: ", err)
return err
}
// Recover snapshot into state machine.
if err = s.stateMachine.Recovery(s.lastSnapshot.State); err != nil {
s.debugln("recovery error: ", err)
if err = s.stateMachine.Recovery(s.snapshot.State); err != nil {
s.debugln("recovery.snapshot.error: ", err)
return err
}
// Recover cluster configuration.
for _, peer := range s.lastSnapshot.Peers {
for _, peer := range s.snapshot.Peers {
s.AddPeer(peer.Name, peer.ConnectionString)
}
// Update log state.
s.log.startTerm = s.lastSnapshot.LastTerm
s.log.startIndex = s.lastSnapshot.LastIndex
s.log.updateCommitIndex(s.lastSnapshot.LastIndex)
s.log.startTerm = s.snapshot.LastTerm
s.log.startIndex = s.snapshot.LastIndex
s.log.updateCommitIndex(s.snapshot.LastIndex)
return err
}
@@ -1290,6 +1345,14 @@ func (s *server) LoadSnapshot() error {
// Config File
//--------------------------------------
// Flushes commit index to the disk.
// So when the raft server restarts, it will commit upto the flushed commitIndex.
func (s *server) FlushCommitIndex() {
s.debugln("server.conf.update")
// Write the configuration to file.
s.writeConf()
}
func (s *server) writeConf() {
peers := make([]*Peer, len(s.peers))

View File

@@ -139,6 +139,47 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
}
}
func TestProcessVoteResponse(t *testing.T) {
// server Term: 0, status: Leader
// response Term : 1, granted
// Expectation: not success
// Server Term 1 status:Leader
server := &server{}
server.eventDispatcher = newEventDispatcher(server)
server.currentTerm = 0
server.state = Leader
response := &RequestVoteResponse{
VoteGranted: true,
Term: 1,
}
if success := server.processVoteResponse(response); success {
t.Fatal("Process should fail if the resp's term is larger than server's")
}
if server.state != Follower {
t.Fatal("Server should stepdown")
}
// server Term: 1, status: Follower
// response Term: 2, granted
// Expectation: not success
response.Term = 2
if success := server.processVoteResponse(response); success {
t.Fatal("Process should fail if the resp's term is larger than server's")
}
if server.state != Follower {
t.Fatal("Server should still be Follower")
}
server.currentTerm = 2
// server Term: 2, status: Follower
// response Term: 2
// Expectation: success
if success := server.processVoteResponse(response); !success {
t.Fatal("Process should success if the server's term is larger than resp's")
}
}
// //--------------------------------------
// // Promotion
// //--------------------------------------

View File

@@ -0,0 +1,304 @@
package raft
import (
"encoding/json"
"fmt"
"hash/crc32"
"io"
"io/ioutil"
"os"
"github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto"
"github.com/coreos/etcd/third_party/github.com/goraft/raft/protobuf"
)
// Snapshot represents an in-memory representation of the current state of the system.
type Snapshot struct {
LastIndex uint64 `json:"lastIndex"`
LastTerm uint64 `json:"lastTerm"`
// Cluster configuration.
Peers []*Peer `json:"peers"`
State []byte `json:"state"`
Path string `json:"path"`
}
// The request sent to a server to start from the snapshot.
type SnapshotRecoveryRequest struct {
LeaderName string
LastIndex uint64
LastTerm uint64
Peers []*Peer
State []byte
}
// The response returned from a server appending entries to the log.
type SnapshotRecoveryResponse struct {
Term uint64
Success bool
CommitIndex uint64
}
// The request sent to a server to start from the snapshot.
type SnapshotRequest struct {
LeaderName string
LastIndex uint64
LastTerm uint64
}
// The response returned if the follower entered snapshot state
type SnapshotResponse struct {
Success bool `json:"success"`
}
// save writes the snapshot to file.
func (ss *Snapshot) save() error {
// Open the file for writing.
file, err := os.OpenFile(ss.Path, os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return err
}
defer file.Close()
// Serialize to JSON.
b, err := json.Marshal(ss)
if err != nil {
return err
}
// Generate checksum and write it to disk.
checksum := crc32.ChecksumIEEE(b)
if _, err = fmt.Fprintf(file, "%08x\n", checksum); err != nil {
return err
}
// Write the snapshot to disk.
if _, err = file.Write(b); err != nil {
return err
}
// Ensure that the snapshot has been flushed to disk before continuing.
if err := file.Sync(); err != nil {
return err
}
return nil
}
// remove deletes the snapshot file.
func (ss *Snapshot) remove() error {
if err := os.Remove(ss.Path); err != nil {
return err
}
return nil
}
// Creates a new Snapshot request.
func newSnapshotRecoveryRequest(leaderName string, snapshot *Snapshot) *SnapshotRecoveryRequest {
return &SnapshotRecoveryRequest{
LeaderName: leaderName,
LastIndex: snapshot.LastIndex,
LastTerm: snapshot.LastTerm,
Peers: snapshot.Peers,
State: snapshot.State,
}
}
// Encodes the SnapshotRecoveryRequest to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (req *SnapshotRecoveryRequest) Encode(w io.Writer) (int, error) {
protoPeers := make([]*protobuf.SnapshotRecoveryRequest_Peer, len(req.Peers))
for i, peer := range req.Peers {
protoPeers[i] = &protobuf.SnapshotRecoveryRequest_Peer{
Name: proto.String(peer.Name),
ConnectionString: proto.String(peer.ConnectionString),
}
}
pb := &protobuf.SnapshotRecoveryRequest{
LeaderName: proto.String(req.LeaderName),
LastIndex: proto.Uint64(req.LastIndex),
LastTerm: proto.Uint64(req.LastTerm),
Peers: protoPeers,
State: req.State,
}
p, err := proto.Marshal(pb)
if err != nil {
return -1, err
}
return w.Write(p)
}
// Decodes the SnapshotRecoveryRequest from a buffer. Returns the number of bytes read and
// any error that occurs.
func (req *SnapshotRecoveryRequest) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)
if err != nil {
return 0, err
}
totalBytes := len(data)
pb := &protobuf.SnapshotRecoveryRequest{}
if err = proto.Unmarshal(data, pb); err != nil {
return -1, err
}
req.LeaderName = pb.GetLeaderName()
req.LastIndex = pb.GetLastIndex()
req.LastTerm = pb.GetLastTerm()
req.State = pb.GetState()
req.Peers = make([]*Peer, len(pb.Peers))
for i, peer := range pb.Peers {
req.Peers[i] = &Peer{
Name: peer.GetName(),
ConnectionString: peer.GetConnectionString(),
}
}
return totalBytes, nil
}
// Creates a new Snapshot response.
func newSnapshotRecoveryResponse(term uint64, success bool, commitIndex uint64) *SnapshotRecoveryResponse {
return &SnapshotRecoveryResponse{
Term: term,
Success: success,
CommitIndex: commitIndex,
}
}
// Encode writes the response to a writer.
// Returns the number of bytes written and any error that occurs.
func (req *SnapshotRecoveryResponse) Encode(w io.Writer) (int, error) {
pb := &protobuf.SnapshotRecoveryResponse{
Term: proto.Uint64(req.Term),
Success: proto.Bool(req.Success),
CommitIndex: proto.Uint64(req.CommitIndex),
}
p, err := proto.Marshal(pb)
if err != nil {
return -1, err
}
return w.Write(p)
}
// Decodes the SnapshotRecoveryResponse from a buffer.
func (req *SnapshotRecoveryResponse) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)
if err != nil {
return 0, err
}
totalBytes := len(data)
pb := &protobuf.SnapshotRecoveryResponse{}
if err := proto.Unmarshal(data, pb); err != nil {
return -1, err
}
req.Term = pb.GetTerm()
req.Success = pb.GetSuccess()
req.CommitIndex = pb.GetCommitIndex()
return totalBytes, nil
}
// Creates a new Snapshot request.
func newSnapshotRequest(leaderName string, snapshot *Snapshot) *SnapshotRequest {
return &SnapshotRequest{
LeaderName: leaderName,
LastIndex: snapshot.LastIndex,
LastTerm: snapshot.LastTerm,
}
}
// Encodes the SnapshotRequest to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (req *SnapshotRequest) Encode(w io.Writer) (int, error) {
pb := &protobuf.SnapshotRequest{
LeaderName: proto.String(req.LeaderName),
LastIndex: proto.Uint64(req.LastIndex),
LastTerm: proto.Uint64(req.LastTerm),
}
p, err := proto.Marshal(pb)
if err != nil {
return -1, err
}
return w.Write(p)
}
// Decodes the SnapshotRequest from a buffer. Returns the number of bytes read and
// any error that occurs.
func (req *SnapshotRequest) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)
if err != nil {
return 0, err
}
totalBytes := len(data)
pb := &protobuf.SnapshotRequest{}
if err := proto.Unmarshal(data, pb); err != nil {
return -1, err
}
req.LeaderName = pb.GetLeaderName()
req.LastIndex = pb.GetLastIndex()
req.LastTerm = pb.GetLastTerm()
return totalBytes, nil
}
// Creates a new Snapshot response.
func newSnapshotResponse(success bool) *SnapshotResponse {
return &SnapshotResponse{
Success: success,
}
}
// Encodes the SnapshotResponse to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (resp *SnapshotResponse) Encode(w io.Writer) (int, error) {
pb := &protobuf.SnapshotResponse{
Success: proto.Bool(resp.Success),
}
p, err := proto.Marshal(pb)
if err != nil {
return -1, err
}
return w.Write(p)
}
// Decodes the SnapshotResponse from a buffer. Returns the number of bytes read and
// any error that occurs.
func (resp *SnapshotResponse) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)
if err != nil {
return 0, err
}
totalBytes := len(data)
pb := &protobuf.SnapshotResponse{}
if err := proto.Unmarshal(data, pb); err != nil {
return -1, err
}
resp.Success = pb.GetSuccess()
return totalBytes, nil
}

View File

@@ -16,13 +16,13 @@ func TestSnapshot(t *testing.T) {
s.Do(&testCommand1{})
err := s.TakeSnapshot()
assert.NoError(t, err)
assert.Equal(t, s.(*server).lastSnapshot.LastIndex, uint64(2))
assert.Equal(t, s.(*server).snapshot.LastIndex, uint64(2))
// Repeat to make sure new snapshot gets created.
s.Do(&testCommand1{})
err = s.TakeSnapshot()
assert.NoError(t, err)
assert.Equal(t, s.(*server).lastSnapshot.LastIndex, uint64(4))
assert.Equal(t, s.(*server).snapshot.LastIndex, uint64(4))
// Restart server.
s.Stop()

View File

@@ -0,0 +1,61 @@
package raft
import (
"fmt"
"io"
"math/rand"
"os"
"time"
)
// uint64Slice implements sort interface
type uint64Slice []uint64
func (p uint64Slice) Len() int { return len(p) }
func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// WriteFile writes data to a file named by filename.
// If the file does not exist, WriteFile creates it with permissions perm;
// otherwise WriteFile truncates it before writing.
// This is copied from ioutil.WriteFile with the addition of a Sync call to
// ensure the data reaches the disk.
func writeFileSynced(filename string, data []byte, perm os.FileMode) error {
f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
if err != nil {
return err
}
n, err := f.Write(data)
if n < len(data) {
f.Close()
return io.ErrShortWrite
}
err = f.Sync()
if err != nil {
return err
}
return f.Close()
}
// Waits for a random time between two durations and sends the current time on
// the returned channel.
func afterBetween(min time.Duration, max time.Duration) <-chan time.Time {
rand := rand.New(rand.NewSource(time.Now().UnixNano()))
d, delta := min, (max - min)
if delta > 0 {
d += time.Duration(rand.Int63n(int64(delta)))
}
return time.After(d)
}
// TODO(xiangli): Remove assertions when we reach version 1.0
// _assert will panic with a given formatted message if the given condition is false.
func _assert(condition bool, msg string, v ...interface{}) {
if !condition {
panic(fmt.Sprintf("assertion failed: "+msg, v...))
}
}