Merge pull request #128 from xiangli-cmu/bump

Bump
This commit is contained in:
Xiang Li 2013-08-18 21:35:55 -07:00
commit e091923311
34 changed files with 768 additions and 331 deletions

View File

@ -155,7 +155,7 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
addNameToURL(c.Name, c.RaftURL, c.EtcdURL)
// add peer in raft
err := raftServer.AddPeer(c.Name)
err := raftServer.AddPeer(c.Name, "")
// add machine in etcd storage
key := path.Join("_etcd/machines", c.Name)

View File

@ -0,0 +1,50 @@
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package netutil provides network utility functions, complementing the more
// common ones in the net package.
package netutil
import (
"net"
"sync"
)
// LimitListener returns a Listener that accepts at most n simultaneous
// connections from the provided Listener.
func LimitListener(l net.Listener, n int) net.Listener {
ch := make(chan struct{}, n)
for i := 0; i < n; i++ {
ch <- struct{}{}
}
return &limitListener{l, ch}
}
type limitListener struct {
net.Listener
ch chan struct{}
}
func (l *limitListener) Accept() (net.Conn, error) {
<-l.ch
c, err := l.Listener.Accept()
if err != nil {
return nil, err
}
return &limitListenerConn{Conn: c, ch: l.ch}, nil
}
type limitListenerConn struct {
net.Conn
ch chan<- struct{}
close sync.Once
}
func (l *limitListenerConn) Close() error {
err := l.Conn.Close()
l.close.Do(func() {
l.ch <- struct{}{}
})
return err
}

View File

@ -0,0 +1,65 @@
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package netutil
import (
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"sync"
"sync/atomic"
"testing"
"time"
)
func TestLimitListener(t *testing.T) {
const (
max = 5
num = 200
)
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("Listen: %v", err)
}
defer l.Close()
l = LimitListener(l, max)
var open int32
go http.Serve(l, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if n := atomic.AddInt32(&open, 1); n > max {
t.Errorf("%d open connections, want <= %d", n, max)
}
defer atomic.AddInt32(&open, -1)
time.Sleep(10 * time.Millisecond)
fmt.Fprint(w, "some body")
}))
var wg sync.WaitGroup
var failed int32
for i := 0; i < num; i++ {
wg.Add(1)
go func() {
defer wg.Done()
r, err := http.Get("http://" + l.Addr().String())
if err != nil {
t.Logf("Get: %v", err)
atomic.AddInt32(&failed, 1)
return
}
defer r.Body.Close()
io.Copy(ioutil.Discard, r.Body)
}()
}
wg.Wait()
// We expect some Gets to fail as the kernel's accept queue is filled,
// but most should succeed.
if failed >= num/2 {
t.Errorf("too many Gets failed")
}
}

View File

@ -24,7 +24,7 @@ type Auth struct {
User, Password string
}
// DefaultDialer returns the dialer specified by the proxy related variables in
// FromEnvironment returns the dialer specified by the proxy related variables in
// the environment.
func FromEnvironment() Dialer {
allProxy := os.Getenv("all_proxy")

View File

@ -332,17 +332,13 @@ func (p *Buffer) buffree(s []byte) {
// Bool is a helper routine that allocates a new bool value
// to store v and returns a pointer to it.
func Bool(v bool) *bool {
p := new(bool)
*p = v
return p
return &v
}
// Int32 is a helper routine that allocates a new int32 value
// to store v and returns a pointer to it.
func Int32(v int32) *int32 {
p := new(int32)
*p = v
return p
return &v
}
// Int is a helper routine that allocates a new int32 value
@ -357,25 +353,19 @@ func Int(v int) *int32 {
// Int64 is a helper routine that allocates a new int64 value
// to store v and returns a pointer to it.
func Int64(v int64) *int64 {
p := new(int64)
*p = v
return p
return &v
}
// Float32 is a helper routine that allocates a new float32 value
// to store v and returns a pointer to it.
func Float32(v float32) *float32 {
p := new(float32)
*p = v
return p
return &v
}
// Float64 is a helper routine that allocates a new float64 value
// to store v and returns a pointer to it.
func Float64(v float64) *float64 {
p := new(float64)
*p = v
return p
return &v
}
// Uint32 is a helper routine that allocates a new uint32 value
@ -389,17 +379,13 @@ func Uint32(v uint32) *uint32 {
// Uint64 is a helper routine that allocates a new uint64 value
// to store v and returns a pointer to it.
func Uint64(v uint64) *uint64 {
p := new(uint64)
*p = v
return p
return &v
}
// String is a helper routine that allocates a new string value
// to store v and returns a pointer to it.
func String(v string) *string {
p := new(string)
*p = v
return p
return &v
}
// EnumName is a helper function to simplify printing protocol buffer enums

View File

@ -13,23 +13,60 @@
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
//
package config
import (
"bufio"
"errors"
"fmt"
"io/ioutil"
"os"
"strings"
)
var commentPrefix = []string{"//", "#", ";"}
func Read(filename string) (map[string]string, error) {
var res = map[string]string{}
in, err := os.Open(filename)
// Config struct constructs a new configuration handler.
type Config struct {
filename string
config map[string]map[string]string
}
// NewConfig function cnstructs a new Config struct with filename. You have to
// call Read() function to let it read from the file. Otherwise you will get
// empty string (i.e., "") when you are calling Get() function. Another usage
// is that you call NewConfig() function and then call Add()/Set() function to
// add new key-values to the configuration. Finally you can call Write()
// function to write the new configuration to the file.
func NewConfig(filename string) *Config {
c := new(Config)
c.filename = filename
c.config = make(map[string]map[string]string)
return c
}
// Filename function returns the filename of the configuration.
func (c *Config) Filename() string {
return c.filename
}
// SetFilename function sets the filename of the configuration.
func (c *Config) SetFilename(filename string) {
c.filename = filename
}
// Reset function reset the map in the configuration.
func (c *Config) Reset() {
c.config = make(map[string]map[string]string)
}
// Read function reads configurations from the file defined in
// Config.filename.
func (c *Config) Read() error {
in, err := os.Open(c.filename)
if err != nil {
return res, err
return err
}
defer in.Close()
scanner := bufio.NewScanner(in)
@ -40,9 +77,9 @@ func Read(filename string) (map[string]string, error) {
continue
}
if line == "" {
sec := checkSection(scanner.Text())
if sec != "" {
section = sec + "."
sec, ok := checkSection(scanner.Text())
if ok {
section = sec
continue
}
}
@ -54,40 +91,103 @@ func Read(filename string) (map[string]string, error) {
line = line[:len(line)-1]
continue
}
key, value, err := checkLine(line)
if err != nil {
return res, errors.New("WRONG: " + line)
key, value, ok := checkLine(line)
if !ok {
return errors.New("WRONG: " + line)
}
res[section+key] = value
c.Set(section, key, value)
line = ""
}
return res, nil
return nil
}
func checkSection(line string) string {
// Get function returns the value of a key in the configuration. If the key
// does not exist, it returns empty string (i.e., "").
func (c *Config) Get(section string, key string) string {
value, ok := c.config[section][key]
if !ok {
return ""
}
return value
}
// Set function updates the value of a key in the configuration. Function
// Set() is exactly the same as function Add().
func (c *Config) Set(section string, key string, value string) {
_, ok := c.config[section]
if !ok {
c.config[section] = make(map[string]string)
}
c.config[section][key] = value
}
// Add function adds a new key to the configuration. Function Add() is exactly
// the same as function Set().
func (c *Config) Add(section string, key string, value string) {
c.Set(section, key, value)
}
// Del function deletes a key from the configuration.
func (c *Config) Del(section string, key string) {
_, ok := c.config[section]
if ok {
delete(c.config[section], key)
if len(c.config[section]) == 0 {
delete(c.config, section)
}
}
}
// Write function writes the updated configuration back.
func (c *Config) Write() error {
return nil
}
// WriteTo function writes the configuration to a new file. This function
// re-organizes the configuration and deletes all the comments.
func (c *Config) WriteTo(filename string) error {
content := ""
for k, v := range c.config {
format := "%v = %v\n"
if k != "" {
content += fmt.Sprintf("[%v]\n", k)
format = "\t" + format
}
for key, value := range v {
content += fmt.Sprintf(format, key, value)
}
}
return ioutil.WriteFile(filename, []byte(content), 0644)
}
// To check this line is a section or not. If it is not a section, it returns
// "".
func checkSection(line string) (string, bool) {
line = strings.TrimSpace(line)
lineLen := len(line)
if lineLen < 2 {
return ""
return "", false
}
if line[0] == '[' && line[lineLen-1] == ']' {
return line[1 : lineLen-1]
return line[1 : lineLen-1], true
}
return ""
return "", false
}
func checkLine(line string) (string, string, error) {
// To check this line is a valid key-value pair or not.
func checkLine(line string) (string, string, bool) {
key := ""
value := ""
sp := strings.SplitN(line, "=", 2)
if len(sp) != 2 {
return key, value, errors.New("WRONG: " + line)
return key, value, false
}
key = strings.TrimSpace(sp[0])
value = strings.TrimSpace(sp[1])
return key, value, nil
return key, value, true
}
// To check this line is a whole line comment or not.
func checkComment(line string) bool {
line = strings.TrimSpace(line)
for p := range commentPrefix {

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
//
package main
import (
@ -22,9 +22,11 @@ import (
)
func main() {
res, err := config.Read("example.conf")
c := config.NewConfig("example.conf")
err := c.Read()
fmt.Println(err)
fmt.Println(res)
fmt.Println(res["test.a"])
fmt.Println(res["dd"])
fmt.Println(c)
fmt.Println(c.Get("test", "a"))
fmt.Println(c.Get("", "dd"))
c.WriteTo("example2.conf")
}

View File

@ -5,14 +5,6 @@ go-logging is a high-performance logging library for golang.
low delay of about 800 nano-seconds.
## Getting Started
The stable version is under the `stable` branch, which does never revert and
is fully tested. The tags in `stable` branch indicate the version numbers.
However, `master` branch is unstable version, and `dev` branch is development
branch. `master` branch merges `dev` branch periodically.
Btw, all the pull request should be sent to the `dev` branch.
### Installation
The step below will download the library source code to
`${GOPATH}/src/github.com/ccding/go-logging`.
@ -46,7 +38,6 @@ import (
func main() {
logger, _ := logging.SimpleLogger("main")
logger.SetLevel(logging.DEBUG)
logger.Error("this is a test from error")
logger.Destroy()
}

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
//
package main
import (

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
//
package logging
// Logln receives log request from the client. The request includes a set of

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
//
package logging
import (

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
//
package logging
import (

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
//
package logging
import (

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
//
// This file defines GetGoId function, which is used to get the id of the
// current goroutine. More details about this function are availeble in the
// runtime.c file of golang source code.

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
//
package logging
// Level is the type of level.

View File

@ -99,13 +99,15 @@ func RichLogger(name string) (*Logger, error) {
func FileLogger(name string, level Level, format string, timeFormat string, file string, sync bool) (*Logger, error) {
out, err := os.Create(file)
if err != nil {
return new(Logger), err
return nil, err
}
logger, err := createLogger(name, level, format, timeFormat, out, sync)
if err == nil {
logger.fd = out
return logger, nil
} else {
return nil, err
}
return logger, err
}
// WriterLogger creates a new logger with a writer
@ -115,38 +117,35 @@ func WriterLogger(name string, level Level, format string, timeFormat string, ou
// WriterLogger creates a new logger from a configuration file
func ConfigLogger(filename string) (*Logger, error) {
conf, err := config.Read(filename)
conf := config.NewConfig(filename)
err := conf.Read()
if err != nil {
return new(Logger), err
return nil, err
}
ok := true
name, ok := conf["name"]
if !ok {
name = ""
}
slevel, ok := conf["level"]
if !ok {
name := conf.Get("", "name")
slevel := conf.Get("", "level")
if slevel == "" {
slevel = "0"
}
l, err := strconv.Atoi(slevel)
if err != nil {
return new(Logger), err
return nil, err
}
level := Level(l)
format, ok := conf["format"]
if !ok {
format := conf.Get("", "format")
if format == "" {
format = BasicFormat
}
timeFormat, ok := conf["timeFormat"]
if !ok {
timeFormat := conf.Get("", "timeFormat")
if timeFormat == "" {
timeFormat = DefaultTimeFormat
}
ssync, ok := conf["sync"]
if !ok {
ssync := conf.Get("", "sync")
if ssync == "" {
ssync = "0"
}
file, ok := conf["file"]
if !ok {
file := conf.Get("", "file")
if file == "" {
file = DefaultFileName
}
sync := true
@ -155,7 +154,7 @@ func ConfigLogger(filename string) (*Logger, error) {
} else if ssync == "1" {
sync = true
} else {
return new(Logger), err
return nil, err
}
return FileLogger(name, level, format, timeFormat, file, sync)
}
@ -166,7 +165,7 @@ func createLogger(name string, level Level, format string, timeFormat string, ou
err := logger.parseFormat(format)
if err != nil {
return logger, err
return nil, err
}
// asign values to logger

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
//
package logging
import (

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
//
package logging
// request struct stores the logger request

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
//
package logging
import (

View File

@ -6,6 +6,7 @@ import (
"io/ioutil"
"net"
"net/http"
"net/url"
"path"
"strings"
"time"
@ -39,10 +40,10 @@ func NewClient() *Client {
// default leader and machines
cluster := Cluster{
Leader: "0.0.0.0:4001",
Leader: "http://0.0.0.0:4001",
Machines: make([]string, 1),
}
cluster.Machines[0] = "0.0.0.0:4001"
cluster.Machines[0] = "http://0.0.0.0:4001"
config := Config{
// default use http
@ -123,12 +124,19 @@ func (c *Client) internalSyncCluster(machines []string) bool {
continue
} else {
b, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
// try another machine in the cluster
continue
}
// update Machines List
c.cluster.Machines = strings.Split(string(b), ",")
// update leader
// the first one in the machine list is the leader
logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, c.cluster.Machines[0])
c.cluster.Leader = c.cluster.Machines[0]
logger.Debug("sync.machines ", c.cluster.Machines)
return true
}
@ -138,8 +146,9 @@ func (c *Client) internalSyncCluster(machines []string) bool {
// serverName should contain both hostName and port
func (c *Client) createHttpPath(serverName string, _path string) string {
httpPath := path.Join(serverName, _path)
return httpPath
u, _ := url.Parse(serverName)
u.Path = path.Join(u.Path, "/", _path)
return u.String()
}
// Dial with timeout.
@ -148,22 +157,21 @@ func dialTimeout(network, addr string) (net.Conn, error) {
}
func (c *Client) getHttpPath(s ...string) string {
httpPath := path.Join(c.cluster.Leader, version)
u, _ := url.Parse(c.cluster.Leader)
u.Path = path.Join(u.Path, "/", version)
for _, seg := range s {
httpPath = path.Join(httpPath, seg)
u.Path = path.Join(u.Path, seg)
}
httpPath = c.config.Scheme + "://" + httpPath
return httpPath
return u.String()
}
func (c *Client) updateLeader(httpPath string) {
// httpPath http://127.0.0.1:4001/v1...
leader := strings.Split(httpPath, "://")[1]
// we want to have 127.0.0.1:4001
u, _ := url.Parse(httpPath)
leader := u.Host
leader = strings.Split(leader, "/")[0]
logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, leader)
c.cluster.Leader = leader
}
@ -180,6 +188,7 @@ func (c *Client) sendRequest(method string, _path string, body string) (*http.Re
for {
httpPath := c.getHttpPath(_path)
logger.Debug("send.request.to ", httpPath)
if body == "" {

View File

@ -1,19 +1,49 @@
[![Build Status](https://travis-ci.org/benbjohnson/go-raft.png?branch=master)](https://travis-ci.org/benbjohnson/go-raft)
go-raft
=======
[![Build Status](https://travis-ci.org/goraft/raft.png?branch=master)](https://travis-ci.org/goraft/raft)
## Overview
This is an Go implementation of the Raft distributed consensus protocol.
This is a Go implementation of the Raft distributed consensus protocol.
Raft is a protocol by which a cluster of nodes can maintain a replicated state machine.
The state machine is kept in sync through the use of a replicated log.
For more details on Raft, you can read [In Search of an Understandable Consensus Algorithm](https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf) by Diego Ongaro and John Ousterhout.
For more details on Raft, you can read [In Search of an Understandable Consensus Algorithm][raft-paper] by Diego Ongaro and John Ousterhout.
## Project Status
This library is feature complete but should be considered experimental until it has seen more usage.
If you have any questions on implementing go-raft in your project please file an issue.
There is an [active community][community] of developers who can help.
go-raft is under the MIT license.
[community]: https://github.com/goraft/raft/contributors
### Features
- Leader election
- Log replication
- Configuration changes
- Log compaction
- Unit tests
- Fast Protobuf Log Encoding
- HTTP transport
### Projects
These projects are built on go-raft:
- [coreos/etcd](https://github.com/coreos/etcd) - A highly-available key value store for shared configuration and service discovery
- [benbjohnson/raftd](https://github.com/benbjohnson/raftd) - A reference implementation for using the go-raft library for distributed consensus.
If you have a project that you're using go-raft in, please add it to this README so others can see implementation examples.
## The Raft Protocol
This section provides a summary of the Raft protocol from a high level.
For a more detailed explanation on the failover process and election terms please see the full paper describing the protocol: [In Search of an Understandable Consensus Algorithm][raft-paper].
### Overview
Maintaining state in a single process on a single server is easy.
@ -26,7 +56,7 @@ Servers can crash or the network between two machines can become unavailable or
A distributed consensus protocol is used for maintaining a consistent state across multiple servers in a cluster.
Many distributed systems are built upon the Paxos protocol but Paxos can be difficult to understand and there are many gaps between Paxos and real world implementation.
An alternative is the [Raft distributed consensus protocol](https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf) by Diego Ongaro and John Ousterhout.
An alternative is the [Raft distributed consensus protocol][raft-paper] by Diego Ongaro and John Ousterhout.
Raft is a protocol built with understandability as a primary tenant and it centers around two things:
1. Leader Election
@ -53,17 +83,9 @@ By ensuring that this log is replicated identically between all the nodes in the
Replicating the log under normal conditions is done by sending an `AppendEntries` RPC from the leader to each of the other servers in the cluster (called Peers).
Each peer will append the entries from the leader through a 2-phase commit process which ensure that a majority of servers in the cluster have entries written to log.
For a more detailed explanation on the failover process and election terms please see the full paper describing the protocol: [In Search of an Understandable Consensus Algorithm](https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf)
## History
Ben Johnson started this library for use in his behavioral analytics database called [Sky](https://github.com/skydb/sky).
He put it under the MIT license in the hopes that it would be useful for other projects too.
## Project Status
The go-raft library is feature complete but in alpha.
There is a reference implementation called [raftd](https://github.com/benbjohnson/raftd) that demonstrates how to use the library
The library will be considered experimental until it has significant production usage.
I'm writing the library for the purpose of including distributed processing in my behavioral analytics database called [Sky](https://github.com/skydb/sky).
However, I hope other projects can benefit from having a distributed consensus protocol so the go-raft library is available under MIT license.
If you have a project that you're using go-raft in, please add it to this README and send a pull request so others can see implementation examples.
If you have any questions on implementing go-raft in your project, feel free to contact me on [GitHub](https://github.com/benbjohnson), [Twitter](https://twitter.com/benbjohnson) or by e-mail at [ben@skylandlabs.com](mailto:ben@skylandlabs.com).
[raft-paper]: https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf

View File

@ -0,0 +1,7 @@
package raft
type Config struct {
CommitIndex uint64 `json:"commitIndex"`
// TODO decide what we need to store in peer struct
Peers []*Peer `json:"peers"`
}

View File

@ -94,7 +94,7 @@ func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, r
return nil
}
url := fmt.Sprintf("http://%s%s", peer.Name(), t.AppendEntriesPath())
url := fmt.Sprintf("http://%s%s", peer.Name, t.AppendEntriesPath())
traceln(server.Name(), "POST", url)
client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}}
@ -122,7 +122,7 @@ func (t *HTTPTransporter) SendVoteRequest(server *Server, peer *Peer, req *Reque
return nil
}
url := fmt.Sprintf("http://%s%s", peer.Name(), t.RequestVotePath())
url := fmt.Sprintf("http://%s%s", peer.Name, t.RequestVotePath())
traceln(server.Name(), "POST", url)
client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}}

View File

@ -9,7 +9,8 @@ type JoinCommand interface {
// Join command
type DefaultJoinCommand struct {
Name string `json:"name"`
Name string `json:"name"`
ConnectionString string `json:"connectionString"`
}
// The name of the Join command in the log
@ -18,7 +19,7 @@ func (c *DefaultJoinCommand) CommandName() string {
}
func (c *DefaultJoinCommand) Apply(server *Server) (interface{}, error) {
err := server.AddPeer(c.Name)
err := server.AddPeer(c.Name, c.ConnectionString)
return []byte("join"), err
}

View File

@ -183,6 +183,15 @@ func (l *Log) open(path string) error {
// Append entry.
l.entries = append(l.entries, entry)
if entry.Index <= l.commitIndex {
command, err := newCommand(entry.CommandName, entry.Command)
if err != nil {
continue
}
l.ApplyFunc(command)
}
debugln("open.log.append log index ", entry.Index)
readBytes += int64(n)

View File

@ -14,7 +14,8 @@ import (
// A peer is a reference to another server involved in the consensus protocol.
type Peer struct {
server *Server
name string
Name string `json:"name"`
ConnectionString string `json:"connectionString"`
prevLogIndex uint64
mutex sync.RWMutex
stopChan chan bool
@ -28,10 +29,11 @@ type Peer struct {
//------------------------------------------------------------------------------
// Creates a new peer.
func newPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer {
func newPeer(server *Server, name string, connectionString string, heartbeatTimeout time.Duration) *Peer {
return &Peer{
server: server,
name: name,
Name: name,
ConnectionString: connectionString,
heartbeatTimeout: heartbeatTimeout,
}
}
@ -42,11 +44,6 @@ func newPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer
//
//------------------------------------------------------------------------------
// Retrieves the name of the peer.
func (p *Peer) Name() string {
return p.name
}
// Sets the heartbeat timeout.
func (p *Peer) setHeartbeatTimeout(duration time.Duration) {
p.heartbeatTimeout = duration
@ -89,17 +86,17 @@ func (p *Peer) startHeartbeat() {
}
// Stops the peer heartbeat.
func (p *Peer) stopHeartbeat() {
func (p *Peer) stopHeartbeat(flush bool) {
// here is a problem
// the previous stop is no buffer leader may get blocked
// when heartbeat returns at line 132
// when heartbeat returns
// I make the channel with 1 buffer
// and try to panic here
select {
case p.stopChan <- true:
case p.stopChan <- flush:
default:
panic("[" + p.server.Name() + "] cannot stop [" + p.Name() + "] heartbeat")
panic("[" + p.server.Name() + "] cannot stop [" + p.Name + "] heartbeat")
}
}
@ -113,8 +110,9 @@ func (p *Peer) clone() *Peer {
p.mutex.Lock()
defer p.mutex.Unlock()
return &Peer{
name: p.name,
prevLogIndex: p.prevLogIndex,
Name: p.Name,
ConnectionString: p.ConnectionString,
prevLogIndex: p.prevLogIndex,
}
}
@ -128,46 +126,58 @@ func (p *Peer) heartbeat(c chan bool) {
c <- true
debugln("peer.heartbeat: ", p.Name(), p.heartbeatTimeout)
debugln("peer.heartbeat: ", p.Name, p.heartbeatTimeout)
for {
select {
case <-stopChan:
debugln("peer.heartbeat.stop: ", p.Name())
return
case <-time.After(p.heartbeatTimeout):
debugln("peer.heartbeat.run: ", p.Name())
prevLogIndex := p.getPrevLogIndex()
entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)
if p.server.State() != Leader {
case flush := <-stopChan:
if !flush {
debugln("peer.heartbeat.stop: ", p.Name)
return
} else {
// before we can safely remove a node
// we must flush the remove command to the node first
p.flush()
debugln("peer.heartbeat.stop: ", p.Name)
return
}
if entries != nil {
p.sendAppendEntriesRequest(newAppendEntriesRequest(p.server.currentTerm, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries))
} else {
p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.lastSnapshot))
}
case <-time.After(p.heartbeatTimeout):
p.flush()
}
}
}
func (p *Peer) flush() {
debugln("peer.heartbeat.run: ", p.Name)
prevLogIndex := p.getPrevLogIndex()
entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)
if p.server.State() != Leader {
return
}
if entries != nil {
p.sendAppendEntriesRequest(newAppendEntriesRequest(p.server.currentTerm, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries))
} else {
p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.lastSnapshot))
}
}
//--------------------------------------
// Append Entries
//--------------------------------------
// Sends an AppendEntries request to the peer through the transport.
func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
traceln("peer.flush.send: ", p.server.Name(), "->", p.Name(), " ", len(req.Entries))
traceln("peer.flush.send: ", p.server.Name(), "->", p.Name, " ", len(req.Entries))
resp := p.server.Transporter().SendAppendEntriesRequest(p.server, p, req)
if resp == nil {
debugln("peer.flush.timeout: ", p.server.Name(), "->", p.Name())
debugln("peer.flush.timeout: ", p.server.Name(), "->", p.Name)
return
}
traceln("peer.flush.recv: ", p.Name())
traceln("peer.flush.recv: ", p.Name)
// If successful then update the previous log index.
p.mutex.Lock()
@ -181,7 +191,7 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
resp.append = true
}
}
traceln("peer.flush.success: ", p.server.Name(), "->", p.Name(), "; idx =", p.prevLogIndex)
traceln("peer.flush.success: ", p.server.Name(), "->", p.Name, "; idx =", p.prevLogIndex)
// If it was unsuccessful then decrement the previous log index and
// we'll try again next time.
@ -195,7 +205,7 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
p.prevLogIndex = resp.CommitIndex
debugln("peer.flush.commitIndex: ", p.server.Name(), "->", p.Name(), " idx =", p.prevLogIndex)
debugln("peer.flush.commitIndex: ", p.server.Name(), "->", p.Name, " idx =", p.prevLogIndex)
} else if p.prevLogIndex > 0 {
// Decrement the previous log index down until we find a match. Don't
// let it go below where the peer's commit index is though. That's a
@ -206,35 +216,35 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
p.prevLogIndex = resp.Index
}
debugln("peer.flush.decrement: ", p.server.Name(), "->", p.Name(), " idx =", p.prevLogIndex)
debugln("peer.flush.decrement: ", p.server.Name(), "->", p.Name, " idx =", p.prevLogIndex)
}
}
p.mutex.Unlock()
// Attach the peer to resp, thus server can know where it comes from
resp.peer = p.Name()
resp.peer = p.Name
// Send response to server for processing.
p.server.send(resp)
}
// Sends an Snapshot request to the peer through the transport.
func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) {
debugln("peer.snap.send: ", p.name)
debugln("peer.snap.send: ", p.Name)
resp := p.server.Transporter().SendSnapshotRequest(p.server, p, req)
if resp == nil {
debugln("peer.snap.timeout: ", p.name)
debugln("peer.snap.timeout: ", p.Name)
return
}
debugln("peer.snap.recv: ", p.name)
debugln("peer.snap.recv: ", p.Name)
// If successful, the peer should have been to snapshot state
// Send it the snapshot!
if resp.Success {
p.sendSnapshotRecoveryRequest()
} else {
debugln("peer.snap.failed: ", p.name)
debugln("peer.snap.failed: ", p.Name)
return
}
@ -243,12 +253,12 @@ func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) {
// Sends an Snapshot Recovery request to the peer through the transport.
func (p *Peer) sendSnapshotRecoveryRequest() {
req := newSnapshotRecoveryRequest(p.server.name, p.server.lastSnapshot)
debugln("peer.snap.recovery.send: ", p.name)
debugln("peer.snap.recovery.send: ", p.Name)
resp := p.server.Transporter().SendSnapshotRecoveryRequest(p.server, p, req)
if resp.Success {
p.prevLogIndex = req.LastIndex
} else {
debugln("peer.snap.recovery.failed: ", p.name)
debugln("peer.snap.recovery.failed: ", p.Name)
return
}
// Send response to server for processing.
@ -261,10 +271,10 @@ func (p *Peer) sendSnapshotRecoveryRequest() {
// send VoteRequest Request
func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteResponse) {
debugln("peer.vote: ", p.server.Name(), "->", p.Name())
debugln("peer.vote: ", p.server.Name(), "->", p.Name)
req.peer = p
if resp := p.server.Transporter().SendVoteRequest(p.server, p, req); resp != nil {
debugln("peer.vote: recv", p.server.Name(), "<-", p.Name())
debugln("peer.vote: recv", p.server.Name(), "<-", p.Name)
resp.peer = p
c <- resp
}

View File

@ -14,12 +14,12 @@ var _ = &json.SyntaxError{}
var _ = math.Inf
type ProtoSnapshotRecoveryRequest struct {
LeaderName *string `protobuf:"bytes,1,req" json:"LeaderName,omitempty"`
LastIndex *uint64 `protobuf:"varint,2,req" json:"LastIndex,omitempty"`
LastTerm *uint64 `protobuf:"varint,3,req" json:"LastTerm,omitempty"`
Peers []string `protobuf:"bytes,4,rep" json:"Peers,omitempty"`
State []byte `protobuf:"bytes,5,req" json:"State,omitempty"`
XXX_unrecognized []byte `json:"-"`
LeaderName *string `protobuf:"bytes,1,req" json:"LeaderName,omitempty"`
LastIndex *uint64 `protobuf:"varint,2,req" json:"LastIndex,omitempty"`
LastTerm *uint64 `protobuf:"varint,3,req" json:"LastTerm,omitempty"`
Peers []*ProtoSnapshotRecoveryRequest_ProtoPeer `protobuf:"bytes,4,rep" json:"Peers,omitempty"`
State []byte `protobuf:"bytes,5,req" json:"State,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *ProtoSnapshotRecoveryRequest) Reset() { *m = ProtoSnapshotRecoveryRequest{} }
@ -47,7 +47,7 @@ func (m *ProtoSnapshotRecoveryRequest) GetLastTerm() uint64 {
return 0
}
func (m *ProtoSnapshotRecoveryRequest) GetPeers() []string {
func (m *ProtoSnapshotRecoveryRequest) GetPeers() []*ProtoSnapshotRecoveryRequest_ProtoPeer {
if m != nil {
return m.Peers
}
@ -61,5 +61,31 @@ func (m *ProtoSnapshotRecoveryRequest) GetState() []byte {
return nil
}
type ProtoSnapshotRecoveryRequest_ProtoPeer struct {
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
ConnectionString *string `protobuf:"bytes,2,req" json:"ConnectionString,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *ProtoSnapshotRecoveryRequest_ProtoPeer) Reset() {
*m = ProtoSnapshotRecoveryRequest_ProtoPeer{}
}
func (m *ProtoSnapshotRecoveryRequest_ProtoPeer) String() string { return proto.CompactTextString(m) }
func (*ProtoSnapshotRecoveryRequest_ProtoPeer) ProtoMessage() {}
func (m *ProtoSnapshotRecoveryRequest_ProtoPeer) GetName() string {
if m != nil && m.Name != nil {
return *m.Name
}
return ""
}
func (m *ProtoSnapshotRecoveryRequest_ProtoPeer) GetConnectionString() string {
if m != nil && m.ConnectionString != nil {
return *m.ConnectionString
}
return ""
}
func init() {
}

View File

@ -3,7 +3,13 @@ package protobuf;
message ProtoSnapshotRecoveryRequest {
required string LeaderName=1;
required uint64 LastIndex=2;
required uint64 LastTerm=3;
repeated string Peers=4;
required uint64 LastTerm=3;
message ProtoPeer {
required string Name=1;
required string ConnectionString=2;
}
repeated ProtoPeer Peers=4;
required bytes State=5;
}

View File

@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"hash/crc32"
"io"
"io/ioutil"
"os"
"path"
@ -81,8 +80,6 @@ type Server struct {
lastSnapshot *Snapshot
stateMachine StateMachine
maxLogEntriesPerRequest uint64
confFile *os.File
}
// An event to be processed by the server's event loop.
@ -272,11 +269,15 @@ func (s *Server) QuorumSize() int {
// Retrieves the election timeout.
func (s *Server) ElectionTimeout() time.Duration {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.electionTimeout
}
// Sets the election timeout.
func (s *Server) SetElectionTimeout(duration time.Duration) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.electionTimeout = duration
}
@ -286,6 +287,8 @@ func (s *Server) SetElectionTimeout(duration time.Duration) {
// Retrieves the heartbeat timeout.
func (s *Server) HeartbeatTimeout() time.Duration {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.heartbeatTimeout
}
@ -332,14 +335,14 @@ func (s *Server) Start() error {
// Create snapshot directory if not exist
os.Mkdir(path.Join(s.path, "snapshot"), 0700)
// Initialize the log and load it up.
if err := s.log.open(s.LogPath()); err != nil {
s.debugln("raft: Log error: ", err)
if err := s.readConf(); err != nil {
s.debugln("raft: Conf file error: ", err)
return fmt.Errorf("raft: Initialization error: %s", err)
}
if err := s.readConf(); err != nil {
s.debugln("raft: Conf file error: ", err)
// Initialize the log and load it up.
if err := s.log.open(s.LogPath()); err != nil {
s.debugln("raft: Log error: ", err)
return fmt.Errorf("raft: Initialization error: %s", err)
}
@ -368,59 +371,12 @@ func (s *Server) Start() error {
return nil
}
// Read the configuration for the server.
func (s *Server) readConf() error {
var err error
confPath := path.Join(s.path, "conf")
s.debugln("readConf.open ", confPath)
// open conf file
s.confFile, err = os.OpenFile(confPath, os.O_RDWR, 0600)
if err != nil {
if os.IsNotExist(err) {
s.confFile, err = os.OpenFile(confPath, os.O_WRONLY|os.O_CREATE, 0600)
debugln("readConf.create ", confPath)
if err != nil {
return err
}
}
return err
}
peerNames := make([]string, 0)
for {
var peerName string
_, err = fmt.Fscanf(s.confFile, "%s\n", &peerName)
if err != nil {
if err == io.EOF {
s.debugln("server.peer.conf: finish")
break
}
return err
}
s.debugln("server.peer.conf.read: ", peerName)
peerNames = append(peerNames, peerName)
}
s.confFile.Truncate(0)
s.confFile.Seek(0, os.SEEK_SET)
for _, peerName := range peerNames {
s.AddPeer(peerName)
}
return nil
}
// Shuts down the server.
func (s *Server) Stop() {
s.send(&stopValue)
s.mutex.Lock()
defer s.mutex.Unlock()
s.log.close()
s.mutex.Unlock()
}
// Checks if the server is currently running.
@ -532,24 +488,27 @@ func (s *Server) followerLoop() {
case e := <-s.c:
if e.target == &stopValue {
s.setState(Stopped)
} else if command, ok := e.target.(JoinCommand); ok {
//If no log entries exist and a self-join command is issued
//then immediately become leader and commit entry.
if s.log.currentIndex() == 0 && command.NodeName() == s.Name() {
s.debugln("selfjoin and promote to leader")
s.setState(Leader)
s.processCommand(command, e)
} else {
} else {
switch req := e.target.(type) {
case JoinCommand:
//If no log entries exist and a self-join command is issued
//then immediately become leader and commit entry.
if s.log.currentIndex() == 0 && req.NodeName() == s.Name() {
s.debugln("selfjoin and promote to leader")
s.setState(Leader)
s.processCommand(req, e)
} else {
err = NotLeaderError
}
case *AppendEntriesRequest:
e.returnValue, update = s.processAppendEntriesRequest(req)
case *RequestVoteRequest:
e.returnValue, update = s.processRequestVoteRequest(req)
case *SnapshotRequest:
e.returnValue = s.processSnapshotRequest(req)
default:
err = NotLeaderError
}
} else if req, ok := e.target.(*AppendEntriesRequest); ok {
e.returnValue, update = s.processAppendEntriesRequest(req)
} else if req, ok := e.target.(*RequestVoteRequest); ok {
e.returnValue, update = s.processRequestVoteRequest(req)
} else if req, ok := e.target.(*SnapshotRequest); ok {
e.returnValue = s.processSnapshotRequest(req)
} else {
err = NotLeaderError
}
// Callback to event.
@ -629,14 +588,16 @@ func (s *Server) candidateLoop() {
var err error
if e.target == &stopValue {
s.setState(Stopped)
} else if _, ok := e.target.(Command); ok {
err = NotLeaderError
} else if req, ok := e.target.(*AppendEntriesRequest); ok {
e.returnValue, _ = s.processAppendEntriesRequest(req)
} else if req, ok := e.target.(*RequestVoteRequest); ok {
e.returnValue, _ = s.processRequestVoteRequest(req)
} else {
switch req := e.target.(type) {
case Command:
err = NotLeaderError
case *AppendEntriesRequest:
e.returnValue, _ = s.processAppendEntriesRequest(req)
case *RequestVoteRequest:
e.returnValue, _ = s.processRequestVoteRequest(req)
}
}
// Callback to event.
e.c <- err
@ -660,7 +621,7 @@ func (s *Server) candidateLoop() {
}
}
// The event loop that is run when the server is in a Candidate state.
// The event loop that is run when the server is in a Leader state.
func (s *Server) leaderLoop() {
s.setState(Leader)
s.syncedPeer = make(map[string]bool)
@ -682,15 +643,18 @@ func (s *Server) leaderLoop() {
case e := <-s.c:
if e.target == &stopValue {
s.setState(Stopped)
} else if command, ok := e.target.(Command); ok {
s.processCommand(command, e)
continue
} else if req, ok := e.target.(*AppendEntriesRequest); ok {
e.returnValue, _ = s.processAppendEntriesRequest(req)
} else if resp, ok := e.target.(*AppendEntriesResponse); ok {
s.processAppendEntriesResponse(resp)
} else if req, ok := e.target.(*RequestVoteRequest); ok {
e.returnValue, _ = s.processRequestVoteRequest(req)
} else {
switch req := e.target.(type) {
case Command:
s.processCommand(req, e)
continue
case *AppendEntriesRequest:
e.returnValue, _ = s.processAppendEntriesRequest(req)
case *AppendEntriesResponse:
s.processAppendEntriesResponse(req)
case *RequestVoteRequest:
e.returnValue, _ = s.processRequestVoteRequest(req)
}
}
// Callback to event.
@ -705,7 +669,7 @@ func (s *Server) leaderLoop() {
// Stop all peers.
for _, peer := range s.peers {
peer.stopHeartbeat()
peer.stopHeartbeat(false)
}
s.syncedPeer = nil
}
@ -720,16 +684,18 @@ func (s *Server) snapshotLoop() {
if e.target == &stopValue {
s.setState(Stopped)
} else if _, ok := e.target.(Command); ok {
err = NotLeaderError
} else if req, ok := e.target.(*AppendEntriesRequest); ok {
e.returnValue, _ = s.processAppendEntriesRequest(req)
} else if req, ok := e.target.(*RequestVoteRequest); ok {
e.returnValue, _ = s.processRequestVoteRequest(req)
} else if req, ok := e.target.(*SnapshotRecoveryRequest); ok {
e.returnValue = s.processSnapshotRecoveryRequest(req)
} else {
switch req := e.target.(type) {
case Command:
err = NotLeaderError
case *AppendEntriesRequest:
e.returnValue, _ = s.processAppendEntriesRequest(req)
case *RequestVoteRequest:
e.returnValue, _ = s.processRequestVoteRequest(req)
case *SnapshotRecoveryRequest:
e.returnValue = s.processSnapshotRecoveryRequest(req)
}
}
// Callback to event.
e.c <- err
@ -959,31 +925,29 @@ func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot
//--------------------------------------
// Adds a peer to the server.
func (s *Server) AddPeer(name string) error {
func (s *Server) AddPeer(name string, connectiongString string) error {
s.debugln("server.peer.add: ", name, len(s.peers))
defer s.writeConf()
// Do not allow peers to be added twice.
if s.peers[name] != nil {
return nil
}
// Only add the peer if it doesn't have the same name.
if s.name != name {
// when loading snapshot s.confFile should be nil
if s.confFile != nil {
_, err := fmt.Fprintln(s.confFile, name)
s.debugln("server.peer.conf.write: ", name)
if err != nil {
return err
}
}
peer := newPeer(s, name, s.heartbeatTimeout)
if s.State() == Leader {
peer.startHeartbeat()
}
s.peers[peer.name] = peer
// Skip the Peer if it has the same name as the Server
if s.name == name {
return nil
}
peer := newPeer(s, name, connectiongString, s.heartbeatTimeout)
if s.State() == Leader {
peer.startHeartbeat()
}
s.peers[peer.Name] = peer
s.debugln("server.peer.conf.write: ", name)
return nil
}
@ -991,8 +955,12 @@ func (s *Server) AddPeer(name string) error {
func (s *Server) RemovePeer(name string) error {
s.debugln("server.peer.remove: ", name, len(s.peers))
// Ignore removal of the server itself.
if s.name == name {
defer s.writeConf()
if name == s.Name() {
// when the removed node restart, it should be able
// to know it has been removed before. So we need
// to update knownCommitIndex
return nil
}
// Return error if peer doesn't exist.
@ -1001,23 +969,13 @@ func (s *Server) RemovePeer(name string) error {
return fmt.Errorf("raft: Peer not found: %s", name)
}
// TODO: Flush entries to the peer first.
// Stop peer and remove it.
peer.stopHeartbeat()
if s.State() == Leader {
peer.stopHeartbeat(true)
}
delete(s.peers, name)
s.confFile.Truncate(0)
s.confFile.Seek(0, os.SEEK_SET)
for peer := range s.peers {
_, err := fmt.Fprintln(s.confFile, peer)
if err != nil {
return err
}
}
return nil
}
@ -1054,14 +1012,13 @@ func (s *Server) TakeSnapshot() error {
state = []byte{0}
}
var peerNames []string
var peers []*Peer
for _, peer := range s.peers {
peerNames = append(peerNames, peer.Name())
peers = append(peers, peer.clone())
}
peerNames = append(peerNames, s.Name())
s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peerNames, state, path}
s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peers, state, path}
s.saveSnapshot()
@ -1144,8 +1101,8 @@ func (s *Server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *S
s.peers = make(map[string]*Peer)
// recovery the cluster configuration
for _, peerName := range req.Peers {
s.AddPeer(peerName)
for _, peer := range req.Peers {
s.AddPeer(peer.Name, peer.ConnectionString)
}
//update term and index
@ -1237,8 +1194,8 @@ func (s *Server) LoadSnapshot() error {
return err
}
for _, peerName := range s.lastSnapshot.Peers {
s.AddPeer(peerName)
for _, peer := range s.lastSnapshot.Peers {
s.AddPeer(peer.Name, peer.ConnectionString)
}
s.log.startTerm = s.lastSnapshot.LastTerm
@ -1248,6 +1205,62 @@ func (s *Server) LoadSnapshot() error {
return err
}
//--------------------------------------
// Config File
//--------------------------------------
func (s *Server) writeConf() {
peers := make([]*Peer, len(s.peers))
i := 0
for _, peer := range s.peers {
peers[i] = peer.clone()
i++
}
r := &Config{
CommitIndex: s.log.commitIndex,
Peers: peers,
}
b, _ := json.Marshal(r)
confPath := path.Join(s.path, "conf")
tmpConfPath := path.Join(s.path, "conf.tmp")
err := ioutil.WriteFile(tmpConfPath, b, 0600)
if err != nil {
panic(err)
}
os.Rename(tmpConfPath, confPath)
}
// Read the configuration for the server.
func (s *Server) readConf() error {
confPath := path.Join(s.path, "conf")
s.debugln("readConf.open ", confPath)
// open conf file
b, err := ioutil.ReadFile(confPath)
if err != nil {
return nil
}
conf := &Config{}
if err = json.Unmarshal(b, conf); err != nil {
return err
}
s.log.commitIndex = conf.CommitIndex
return nil
}
//--------------------------------------
// Debugging
//--------------------------------------

View File

@ -164,10 +164,10 @@ func TestServerPromote(t *testing.T) {
lookup := map[string]*Server{}
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
return lookup[peer.Name()].RequestVote(req)
return lookup[peer.Name].RequestVote(req)
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
return lookup[peer.Name()].AppendEntries(req)
return lookup[peer.Name].AppendEntries(req)
}
servers := newTestCluster([]string{"1", "2", "3"}, transporter, lookup)
@ -316,6 +316,124 @@ func TestServerDenyCommandExecutionWhenFollower(t *testing.T) {
}
}
//--------------------------------------
// Recovery
//--------------------------------------
// Ensure that a follower cannot execute a command.
func TestServerRecoverFromPreviousLogAndConf(t *testing.T) {
// Initialize the servers.
var mutex sync.RWMutex
servers := map[string]*Server{}
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
mutex.RLock()
s := servers[peer.Name]
mutex.RUnlock()
return s.RequestVote(req)
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
mutex.RLock()
s := servers[peer.Name]
mutex.RUnlock()
return s.AppendEntries(req)
}
disTransporter := &testTransporter{}
disTransporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
return nil
}
disTransporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
return nil
}
var names []string
var paths = make(map[string]string)
n := 5
// add n servers
for i := 1; i <= n; i++ {
names = append(names, strconv.Itoa(i))
}
var leader *Server
for _, name := range names {
server := newTestServer(name, transporter)
servers[name] = server
paths[name] = server.Path()
if name == "1" {
leader = server
server.SetHeartbeatTimeout(testHeartbeatTimeout)
server.Start()
time.Sleep(testHeartbeatTimeout)
} else {
server.SetElectionTimeout(testElectionTimeout)
server.SetHeartbeatTimeout(testHeartbeatTimeout)
server.Start()
time.Sleep(testHeartbeatTimeout)
}
if _, err := leader.Do(&DefaultJoinCommand{Name: name}); err != nil {
t.Fatalf("Unable to join server[%s]: %v", name, err)
}
}
// commit some commands
for i := 0; i < 10; i++ {
if _, err := leader.Do(&testCommand2{X: 1}); err != nil {
t.Fatalf("cannot commit command:", err.Error())
}
}
time.Sleep(2 * testHeartbeatTimeout)
for _, name := range names {
server := servers[name]
if server.CommitIndex() != 16 {
t.Fatalf("%s commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 16)
}
server.Stop()
}
for _, name := range names {
// with old path and disable transportation
server := newTestServerWithPath(name, disTransporter, paths[name])
servers[name] = server
server.Start()
// should only commit to the last join command
if server.CommitIndex() != 6 {
t.Fatalf("%s recover phase 1 commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 6)
}
// peer conf should be recovered
if len(server.Peers()) != 4 {
t.Fatalf("%s recover phase 1 peer failed! [%d/%d]", name, len(server.Peers()), 4)
}
}
// let nodes talk to each other
for _, name := range names {
servers[name].SetTransporter(transporter)
}
time.Sleep(2 * testElectionTimeout)
// should commit to the previous index + 1(nop command when new leader elected)
for _, name := range names {
server := servers[name]
if server.CommitIndex() != 17 {
t.Fatalf("%s commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 16)
}
server.Stop()
}
}
//--------------------------------------
// Membership
//--------------------------------------
@ -357,13 +475,13 @@ func TestServerMultiNode(t *testing.T) {
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
mutex.RLock()
s := servers[peer.name]
s := servers[peer.Name]
mutex.RUnlock()
return s.RequestVote(req)
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
mutex.RLock()
s := servers[peer.name]
s := servers[peer.Name]
mutex.RUnlock()
return s.AppendEntries(req)
}

View File

@ -21,9 +21,9 @@ type Snapshot struct {
LastIndex uint64 `json:"lastIndex"`
LastTerm uint64 `json:"lastTerm"`
// cluster configuration.
Peers []string `json: "peers"`
State []byte `json: "state"`
Path string `json: "path"`
Peers []*Peer `json: "peers"`
State []byte `json: "state"`
Path string `json: "path"`
}
// Save the snapshot to a file

View File

@ -12,7 +12,7 @@ type SnapshotRecoveryRequest struct {
LeaderName string
LastIndex uint64
LastTerm uint64
Peers []string
Peers []*Peer
State []byte
}
@ -36,11 +36,21 @@ func newSnapshotRecoveryRequest(leaderName string, snapshot *Snapshot) *Snapshot
// Encodes the SnapshotRecoveryRequest to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (req *SnapshotRecoveryRequest) encode(w io.Writer) (int, error) {
protoPeers := make([]*protobuf.ProtoSnapshotRecoveryRequest_ProtoPeer, len(req.Peers))
for i, peer := range req.Peers {
protoPeers[i] = &protobuf.ProtoSnapshotRecoveryRequest_ProtoPeer{
Name: proto.String(peer.Name),
ConnectionString: proto.String(peer.ConnectionString),
}
}
pb := &protobuf.ProtoSnapshotRecoveryRequest{
LeaderName: proto.String(req.LeaderName),
LastIndex: proto.Uint64(req.LastIndex),
LastTerm: proto.Uint64(req.LastTerm),
Peers: req.Peers,
Peers: protoPeers,
State: req.State,
}
p, err := proto.Marshal(pb)
@ -62,7 +72,7 @@ func (req *SnapshotRecoveryRequest) decode(r io.Reader) (int, error) {
totalBytes := len(data)
pb := &protobuf.ProtoSnapshotRequest{}
pb := &protobuf.ProtoSnapshotRecoveryRequest{}
if err = proto.Unmarshal(data, pb); err != nil {
return -1, err
}
@ -70,8 +80,16 @@ func (req *SnapshotRecoveryRequest) decode(r io.Reader) (int, error) {
req.LeaderName = pb.GetLeaderName()
req.LastIndex = pb.GetLastIndex()
req.LastTerm = pb.GetLastTerm()
req.Peers = req.Peers
req.State = req.State
req.Peers = make([]*Peer, len(pb.Peers))
for i, peer := range pb.Peers {
req.Peers[i] = &Peer{
Name: peer.GetName(),
ConnectionString: peer.GetConnectionString(),
}
}
return totalBytes, nil
}

View File

@ -69,6 +69,11 @@ func newTestServer(name string, transporter Transporter) *Server {
return server
}
func newTestServerWithPath(name string, transporter Transporter, p string) *Server {
server, _ := NewServer(name, p, transporter, nil, nil)
return server
}
func newTestServerWithLog(name string, transporter Transporter, entries []*LogEntry) *Server {
server := newTestServer(name, transporter)
f, err := os.Create(server.LogPath())
@ -100,7 +105,7 @@ func newTestCluster(names []string, transporter Transporter, lookup map[string]*
server.SetHeartbeatTimeout(testHeartbeatTimeout)
server.Start()
for _, peer := range servers {
server.AddPeer(peer.Name())
server.AddPeer(peer.Name(), "")
}
}
return servers

View File

@ -47,7 +47,7 @@ func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Pe
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
u, _ := nameToRaftURL(peer.Name())
u, _ := nameToRaftURL(peer.Name)
debugf("Send LogEntries to %s ", u)
resp, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
@ -74,7 +74,7 @@ func (t transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
u, _ := nameToRaftURL(peer.Name())
u, _ := nameToRaftURL(peer.Name)
debugf("Send Vote to %s", u)
resp, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
@ -100,7 +100,7 @@ func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, r
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
u, _ := nameToRaftURL(peer.Name())
u, _ := nameToRaftURL(peer.Name)
debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u,
req.LastTerm, req.LastIndex)
@ -128,7 +128,7 @@ func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
u, _ := nameToRaftURL(peer.Name())
u, _ := nameToRaftURL(peer.Name)
debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u,
req.LastTerm, req.LastIndex)