This commit is contained in:
Xiang Li
2013-06-20 15:59:23 -07:00
parent 58e7b456bb
commit ef59a03fbb
11 changed files with 233 additions and 260 deletions

View File

@@ -1,4 +1,5 @@
package main
//------------------------------------------------------------------------------
//
// Commands
@@ -6,11 +7,11 @@ package main
//------------------------------------------------------------------------------
import (
"encoding/json"
"github.com/benbjohnson/go-raft"
"github.com/xiangli-cmu/raft-etcd/store"
"encoding/json"
"time"
)
)
// A command represents an action to be taken on the replicated state machine.
type Command interface {
@@ -20,8 +21,8 @@ type Command interface {
// Set command
type SetCommand struct {
Key string `json:"key"`
Value string `json:"value"`
Key string `json:"key"`
Value string `json:"value"`
ExpireTime time.Time `json:"expireTime"`
}
@@ -40,7 +41,6 @@ func (c *SetCommand) GeneratePath() string {
return "set/" + c.Key
}
// Get command
type GetCommand struct {
Key string `json:"key"`
@@ -52,12 +52,12 @@ func (c *GetCommand) CommandName() string {
}
// Set the value of key to value
func (c *GetCommand) Apply(server *raft.Server) ([]byte, error){
func (c *GetCommand) Apply(server *raft.Server) ([]byte, error) {
res := store.Get(c.Key)
return json.Marshal(res)
}
func (c *GetCommand) GeneratePath() string{
func (c *GetCommand) GeneratePath() string {
return "get/" + c.Key
}
@@ -71,8 +71,8 @@ func (c *DeleteCommand) CommandName() string {
return "delete"
}
// Delete the key
func (c *DeleteCommand) Apply(server *raft.Server) ([]byte, error){
// Delete the key
func (c *DeleteCommand) Apply(server *raft.Server) ([]byte, error) {
return store.Delete(c.Key)
}
@@ -86,14 +86,14 @@ func (c *WatchCommand) CommandName() string {
return "watch"
}
func (c *WatchCommand) Apply(server *raft.Server) ([]byte, error){
func (c *WatchCommand) Apply(server *raft.Server) ([]byte, error) {
ch := make(chan store.Response)
// add to the watchers list
store.AddWatcher(c.Key, ch)
store.AddWatcher(c.Key, ch)
// wait for the notification for any changing
res := <- ch
res := <-ch
return json.Marshal(res)
}
@@ -112,6 +112,3 @@ func (c *JoinCommand) Apply(server *raft.Server) ([]byte, error) {
// no result will be returned
return nil, err
}

View File

@@ -1,17 +1,16 @@
package main
import (
"encoding/json"
"github.com/benbjohnson/go-raft"
"net/http"
"encoding/json"
//"fmt"
"io/ioutil"
//"bytes"
"time"
"strings"
"strconv"
)
"strings"
"time"
)
//--------------------------------------
// HTTP Handlers
@@ -73,9 +72,8 @@ func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}
func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
command := &JoinCommand{}
if err := decodeJsonRequest(req, command); err == nil {
@@ -87,7 +85,6 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
}
}
func SetHttpHandler(w http.ResponseWriter, req *http.Request) {
key := req.URL.Path[len("/set/"):]
@@ -96,7 +93,7 @@ func SetHttpHandler(w http.ResponseWriter, req *http.Request) {
if err != nil {
warn("raftd: Unable to read: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
return
}
debug("[recv] POST http://%v/set/%s [%s]", server.Name(), key, content)
@@ -112,11 +109,11 @@ func SetHttpHandler(w http.ResponseWriter, req *http.Request) {
if err != nil {
warn("raftd: Bad duration: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
return
}
command.ExpireTime = time.Now().Add(time.Second * (time.Duration)(duration))
} else {
command.ExpireTime = time.Unix(0,0)
command.ExpireTime = time.Unix(0, 0)
}
excute(command, &w)
@@ -134,7 +131,6 @@ func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) {
excute(command, &w)
}
func excute(c Command, w *http.ResponseWriter) {
if server.State() == "leader" {
if body, err := server.Do(c); err != nil {
@@ -152,11 +148,11 @@ func excute(c Command, w *http.ResponseWriter) {
(*w).Write([]byte(server.Leader()))
return
}
(*w).WriteHeader(http.StatusInternalServerError)
return
}
}
func MasterHttpHandler(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
@@ -202,8 +198,3 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
}
}

147
raftd.go
View File

@@ -2,23 +2,23 @@ package main
import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
"encoding/pem"
"flag"
"fmt"
"github.com/benbjohnson/go-raft"
"log"
"github.com/xiangli-cmu/raft-etcd/store"
"github.com/xiangli-cmu/raft-etcd/web"
"io"
"io/ioutil"
"log"
"net/http"
"strings"
"os"
"time"
"strconv"
"crypto/tls"
"crypto/x509"
"github.com/xiangli-cmu/raft-etcd/web"
"github.com/xiangli-cmu/raft-etcd/store"
"strings"
"time"
)
//------------------------------------------------------------------------------
@@ -27,7 +27,6 @@ import (
//
//------------------------------------------------------------------------------
var verbose bool
var leaderHost string
var address string
@@ -45,15 +44,16 @@ func init() {
flag.StringVar(&certFile, "cert", "", "the cert file of the server")
flag.StringVar(&keyFile, "key", "", "the key file of the server")
}
// CONSTANTS
const (
const (
HTTP = iota
HTTPS
HTTPSANDVERIFY
)
const (
ELECTIONTIMTOUT = 3 * time.Second
ELECTIONTIMTOUT = 3 * time.Second
HEARTBEATTIMEOUT = 1 * time.Second
)
@@ -65,7 +65,7 @@ const (
type Info struct {
Host string `json:"host"`
Port int `json:"port"`
Port int `json:"port"`
}
//------------------------------------------------------------------------------
@@ -79,7 +79,6 @@ var logger *log.Logger
var storeMsg chan string
//------------------------------------------------------------------------------
//
// Functions
@@ -100,7 +99,7 @@ func main() {
raft.RegisterCommand(&SetCommand{})
raft.RegisterCommand(&GetCommand{})
raft.RegisterCommand(&DeleteCommand{})
// Use the present working directory if a directory was not passed in.
var path string
if flag.NArg() == 0 {
@@ -118,7 +117,7 @@ func main() {
name := fmt.Sprintf("%s:%d", info.Host, info.Port)
fmt.Printf("Name: %s\n\n", name)
// secrity type
st := securityType()
@@ -126,7 +125,7 @@ func main() {
panic("ERROR type")
}
t := createTranHandler(st)
t := createTranHandler(st)
// Setup new raft server.
s := store.GetStore()
@@ -159,7 +158,7 @@ func main() {
server.Do(command)
debug("%s start as a leader", server.Name())
// start as a fellower in a existing cluster
// start as a fellower in a existing cluster
} else {
server.StartElectionTimeout()
server.StartFollower()
@@ -171,7 +170,7 @@ func main() {
fmt.Println("success join")
}
// rejoin the previous cluster
// rejoin the previous cluster
} else {
server.StartElectionTimeout()
server.StartFollower()
@@ -181,15 +180,14 @@ func main() {
// open the snapshot
go server.Snapshot()
if webPort != -1 {
// start web
s.SetMessager(&storeMsg)
go webHelper()
go web.Start(server, webPort)
}
if webPort != -1 {
// start web
s.SetMessager(&storeMsg)
go webHelper()
go web.Start(server, webPort)
}
startTransport(info.Port, st)
startTransport(info.Port, st)
}
@@ -216,12 +214,12 @@ func createTranHandler(st int) transHandler {
}
tr := &http.Transport{
TLSClientConfig: &tls.Config{
Certificates: []tls.Certificate{tlsCert},
TLSClientConfig: &tls.Config{
Certificates: []tls.Certificate{tlsCert},
InsecureSkipVerify: true,
},
DisableCompression: true,
}
},
DisableCompression: true,
}
t.client = &http.Client{Transport: tr}
return t
@@ -231,36 +229,35 @@ func createTranHandler(st int) transHandler {
return transHandler{}
}
func startTransport(port int, st int) {
func startTransport(port int, st int) {
// internal commands
http.HandleFunc("/join", JoinHttpHandler)
http.HandleFunc("/vote", VoteHttpHandler)
http.HandleFunc("/log", GetLogHttpHandler)
http.HandleFunc("/log/append", AppendEntriesHttpHandler)
http.HandleFunc("/snapshot", SnapshotHttpHandler)
http.HandleFunc("/join", JoinHttpHandler)
http.HandleFunc("/vote", VoteHttpHandler)
http.HandleFunc("/log", GetLogHttpHandler)
http.HandleFunc("/log/append", AppendEntriesHttpHandler)
http.HandleFunc("/snapshot", SnapshotHttpHandler)
// external commands
http.HandleFunc("/set/", SetHttpHandler)
http.HandleFunc("/get/", GetHttpHandler)
http.HandleFunc("/delete/", DeleteHttpHandler)
http.HandleFunc("/watch/", WatchHttpHandler)
http.HandleFunc("/master", MasterHttpHandler)
// external commands
http.HandleFunc("/set/", SetHttpHandler)
http.HandleFunc("/get/", GetHttpHandler)
http.HandleFunc("/delete/", DeleteHttpHandler)
http.HandleFunc("/watch/", WatchHttpHandler)
http.HandleFunc("/master", MasterHttpHandler)
switch st {
switch st {
case HTTP:
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
case HTTP:
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
case HTTPS:
http.ListenAndServeTLS(fmt.Sprintf(":%d", port), certFile, keyFile, nil)
case HTTPS:
http.ListenAndServeTLS(fmt.Sprintf(":%d", port), certFile, keyFile, nil)
case HTTPSANDVERIFY:
pemByte, _ := ioutil.ReadFile(CAFile)
case HTTPSANDVERIFY:
pemByte, _ := ioutil.ReadFile(CAFile)
block, pemByte := pem.Decode(pemByte)
cert, err := x509.ParseCertificate(block.Bytes)
if err != nil {
@@ -274,16 +271,16 @@ func startTransport(port int, st int) {
server := &http.Server{
TLSConfig: &tls.Config{
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: certPool,
},
Addr:fmt.Sprintf(":%d", port),
ClientCAs: certPool,
},
Addr: fmt.Sprintf(":%d", port),
}
err = server.ListenAndServeTLS(certFile, keyFile)
if err != nil {
log.Fatal(err)
}
}
}
}
@@ -291,8 +288,8 @@ func startTransport(port int, st int) {
// Config
//--------------------------------------
func securityType() int{
if keyFile == "" && certFile == "" && CAFile == ""{
func securityType() int {
if keyFile == "" && certFile == "" && CAFile == "" {
return HTTP
@@ -310,7 +307,6 @@ func securityType() int{
return -1
}
func getInfo(path string) *Info {
info := &Info{}
@@ -325,10 +321,10 @@ func getInfo(path string) *Info {
}
}
file.Close()
// Otherwise ask user for info and write it to file.
// Otherwise ask user for info and write it to file.
} else {
if address == "" {
fatal("Please give the address of the local machine")
}
@@ -341,9 +337,9 @@ func getInfo(path string) *Info {
info.Host = input[0]
info.Host = strings.TrimSpace(info.Host)
info.Port, err = strconv.Atoi(input[1])
if err != nil {
fatal("Wrong port %s", address)
}
@@ -355,11 +351,10 @@ func getInfo(path string) *Info {
fatal("Unable to write info to file: %v", err)
}
}
return info
}
//--------------------------------------
// Handlers
//--------------------------------------
@@ -367,15 +362,14 @@ func getInfo(path string) *Info {
// Send join requests to the leader.
func Join(s *raft.Server, serverName string) error {
var b bytes.Buffer
command := &JoinCommand{}
command.Name = s.Name()
json.NewEncoder(&b).Encode(command)
// t must be ok
t,_ := server.Transporter().(transHandler)
t, _ := server.Transporter().(transHandler)
debug("Send Join Request to %s", serverName)
resp, err := Post(&t, fmt.Sprintf("%s/join", serverName), &b)
@@ -399,6 +393,7 @@ func Join(s *raft.Server, serverName string) error {
}
return fmt.Errorf("Unable to join: %v", err)
}
//--------------------------------------
// Web Helper
//--------------------------------------
@@ -410,7 +405,6 @@ func webHelper() {
}
}
//--------------------------------------
// HTTP Utilities
//--------------------------------------
@@ -434,13 +428,13 @@ func encodeJsonResponse(w http.ResponseWriter, status int, data interface{}) {
}
}
func Post(t *transHandler, path string, body io.Reader) (*http.Response, error){
func Post(t *transHandler, path string, body io.Reader) (*http.Response, error) {
if t.client != nil {
resp, err := t.client.Post("https://" + path, "application/json", body)
resp, err := t.client.Post("https://"+path, "application/json", body)
return resp, err
} else {
resp, err := http.Post("http://" + path, "application/json", body)
resp, err := http.Post("http://"+path, "application/json", body)
return resp, err
}
}
@@ -461,22 +455,19 @@ func Get(t *transHandler, path string) (*http.Response, error) {
func debug(msg string, v ...interface{}) {
if verbose {
logger.Printf("DEBUG " + msg + "\n", v...)
logger.Printf("DEBUG "+msg+"\n", v...)
}
}
func info(msg string, v ...interface{}) {
logger.Printf("INFO " + msg + "\n", v...)
logger.Printf("INFO "+msg+"\n", v...)
}
func warn(msg string, v ...interface{}) {
logger.Printf("Alpaca Server: WARN " + msg + "\n", v...)
logger.Printf("Alpaca Server: WARN "+msg+"\n", v...)
}
func fatal(msg string, v ...interface{}) {
logger.Printf("FATAL " + msg + "\n", v...)
logger.Printf("FATAL "+msg+"\n", v...)
os.Exit(1)
}

View File

@@ -1,11 +1,11 @@
package store
import (
"path"
"encoding/json"
"time"
"fmt"
)
"path"
"time"
)
// global store
var s *Store
@@ -13,26 +13,24 @@ var s *Store
// CONSTANTS
const (
ERROR = -1 + iota
SET
SET
DELETE
GET
)
var PERMANENT = time.Unix(0,0)
var PERMANENT = time.Unix(0, 0)
type Store struct {
// use the build-in hash map as the key-value store structure
Nodes map[string]Node `json:"nodes"`
Nodes map[string]Node `json:"nodes"`
// the string channel to send messages to the outside world
// now we use it to send changes to the hub of the web service
messager *chan string
}
type Node struct {
Value string `json:"value"`
Value string `json:"value"`
// if the node is a permanent one the ExprieTime will be Unix(0,0)
// Otherwise after the expireTime, the node will be deleted
@@ -43,14 +41,14 @@ type Node struct {
}
type Response struct {
Action int `json:"action"`
Action int `json:"action"`
Key string `json:"key"`
OldValue string `json:"oldValue"`
NewValue string `json:"newValue"`
// if the key existed before the action, this field should be true
// if the key did not exist before the action, this field should be false
Exist bool `json:"exist"`
Exist bool `json:"exist"`
Expiration time.Time `json:"expiration"`
}
@@ -61,7 +59,7 @@ func init() {
}
// make a new stroe
func createStore() *Store{
func createStore() *Store {
s := new(Store)
s.Nodes = make(map[string]Node)
return s
@@ -73,11 +71,11 @@ func GetStore() *Store {
}
// set the messager of the store
func (s *Store)SetMessager(messager *chan string) {
func (s *Store) SetMessager(messager *chan string) {
s.messager = messager
}
}
// set the key to value, return the old value if the key exists
// set the key to value, return the old value if the key exists
func Set(key string, value string, expireTime time.Time) ([]byte, error) {
key = path.Clean(key)
@@ -97,11 +95,11 @@ func Set(key string, value string, expireTime time.Time) ([]byte, error) {
node, ok := s.Nodes[key]
if ok {
// if node is not permanent before
// if node is not permanent before
// update its expireTime
if !node.ExpireTime.Equal(PERMANENT) {
node.update <- expireTime
node.update <- expireTime
} else {
// if we want the permanent node to have expire time
@@ -115,7 +113,7 @@ func Set(key string, value string, expireTime time.Time) ([]byte, error) {
// update the information of the node
node.ExpireTime = expireTime
node.Value = value
resp := Response{SET, key, node.Value, value, true, expireTime}
msg, err := json.Marshal(resp)
@@ -123,14 +121,14 @@ func Set(key string, value string, expireTime time.Time) ([]byte, error) {
notify(resp)
// send to the messager
if (s.messager != nil && err == nil) {
if s.messager != nil && err == nil {
*s.messager <- string(msg)
}
}
return msg, err
// add new node
// add new node
} else {
update := make(chan time.Time)
@@ -149,10 +147,10 @@ func Set(key string, value string, expireTime time.Time) ([]byte, error) {
notify(resp)
// notify the web interface
if (s.messager != nil && err == nil) {
if s.messager != nil && err == nil {
*s.messager <- string(msg)
}
}
return msg, err
}
@@ -180,10 +178,10 @@ func expire(key string, update chan time.Time, expireTime time.Time) {
notify(resp)
// notify the messager
if (s.messager != nil && err == nil) {
if s.messager != nil && err == nil {
*s.messager <- string(msg)
}
}
return
@@ -191,7 +189,7 @@ func expire(key string, update chan time.Time, expireTime time.Time) {
case updateTime := <-update:
//update duration
// if the node become a permanent one, the go routine is
// if the node become a permanent one, the go routine is
// not needed
if updateTime.Equal(PERMANENT) {
return
@@ -242,10 +240,10 @@ func Delete(key string) ([]byte, error) {
notify(resp)
// notify the messager
if (s.messager != nil && err == nil) {
if s.messager != nil && err == nil {
*s.messager <- string(msg)
}
}
return msg, err
@@ -256,7 +254,7 @@ func Delete(key string) ([]byte, error) {
}
// save the current state of the storage system
func (s *Store)Save() ([]byte, error) {
func (s *Store) Save() ([]byte, error) {
b, err := json.Marshal(s)
if err != nil {
fmt.Println(err)
@@ -266,7 +264,7 @@ func (s *Store)Save() ([]byte, error) {
}
// recovery the state of the stroage system from a previous state
func (s *Store)Recovery(state []byte) error {
func (s *Store) Recovery(state []byte) error {
err := json.Unmarshal(state, s)
// clean the expired nodes
@@ -277,7 +275,7 @@ func (s *Store)Recovery(state []byte) error {
// clean all expired keys
func clean() {
for key, node := range s.Nodes{
for key, node := range s.Nodes {
if node.ExpireTime.Equal(PERMANENT) {
continue

View File

@@ -1,9 +1,9 @@
package store
import (
"fmt"
"testing"
"time"
"fmt"
)
func TestStoreGet(t *testing.T) {
@@ -53,7 +53,6 @@ func TestStoreGet(t *testing.T) {
// t.Fatalf("Get expired value")
// }
// s.Delete("foo")
// }
@@ -63,8 +62,8 @@ func TestExpire(t *testing.T) {
fmt.Println("TEST EXPIRE")
// test expire
Set("foo", "bar", time.Now().Add(time.Second * 1))
time.Sleep(2*time.Second)
Set("foo", "bar", time.Now().Add(time.Second*1))
time.Sleep(2 * time.Second)
res := Get("foo")
@@ -73,7 +72,7 @@ func TestExpire(t *testing.T) {
}
//test change expire time
Set("foo", "bar", time.Now().Add(time.Second * 10))
Set("foo", "bar", time.Now().Add(time.Second*10))
res = Get("foo")
@@ -81,7 +80,7 @@ func TestExpire(t *testing.T) {
t.Fatalf("Cannot get Value")
}
Set("foo", "barbar", time.Now().Add(time.Second * 1))
Set("foo", "barbar", time.Now().Add(time.Second*1))
time.Sleep(2 * time.Second)
@@ -91,13 +90,12 @@ func TestExpire(t *testing.T) {
t.Fatalf("Got expired value")
}
// test change expire to stable
Set("foo", "bar", time.Now().Add(time.Second * 1))
Set("foo", "bar", time.Now().Add(time.Second*1))
Set("foo", "bar", time.Unix(0,0))
Set("foo", "bar", time.Unix(0, 0))
time.Sleep(2*time.Second)
time.Sleep(2 * time.Second)
res = s.Get("foo")
@@ -105,22 +103,21 @@ func TestExpire(t *testing.T) {
t.Fatalf("Cannot get Value")
}
// test stable to expire
s.Set("foo", "bar", time.Now().Add(time.Second * 1))
time.Sleep(2*time.Second)
// test stable to expire
s.Set("foo", "bar", time.Now().Add(time.Second*1))
time.Sleep(2 * time.Second)
res = s.Get("foo")
if res.Exist {
t.Fatalf("Got expired value")
}
// test set older node
s.Set("foo", "bar", time.Now().Add(-time.Second * 1))
// test set older node
s.Set("foo", "bar", time.Now().Add(-time.Second*1))
res = s.Get("foo")
if res.Exist {
t.Fatalf("Got expired value")
}
}

View File

@@ -3,9 +3,9 @@ package store
import (
"path"
"strings"
//"fmt"
)
//"fmt"
)
type Watchers struct {
chanMap map[string][]chan Response
@@ -14,7 +14,6 @@ type Watchers struct {
// global watcher
var w *Watchers
// init the global watcher
func init() {
w = createWatcher()
@@ -66,7 +65,7 @@ func notify(resp Response) error {
for _, c := range chans {
c <- resp
}
// we have notified all the watchers at this path
// delete the map
delete(w.chanMap, currPath)
@@ -75,4 +74,4 @@ func notify(resp Response) error {
}
return nil
}
}

View File

@@ -1,8 +1,8 @@
package store
import (
"testing"
"fmt"
"testing"
"time"
)

View File

@@ -1,16 +1,16 @@
package main
import(
"encoding/json"
"github.com/benbjohnson/go-raft"
import (
"bytes"
"net/http"
"encoding/json"
"fmt"
"github.com/benbjohnson/go-raft"
"io"
"net/http"
)
type transHandler struct {
name string
name string
client *http.Client
}
@@ -19,7 +19,9 @@ func (t transHandler) SendAppendEntriesRequest(server *raft.Server, peer *raft.P
var aersp *raft.AppendEntriesResponse
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
debug("Send LogEntries to %s ", peer.Name())
resp, err := Post(&t, fmt.Sprintf("%s/log/append", peer.Name()), &b)
if resp != nil {
@@ -28,7 +30,7 @@ func (t transHandler) SendAppendEntriesRequest(server *raft.Server, peer *raft.P
if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
return aersp, nil
}
}
return aersp, fmt.Errorf("raftd: Unable to append entries: %v", err)
}
@@ -39,6 +41,8 @@ func (t transHandler) SendVoteRequest(server *raft.Server, peer *raft.Peer, req
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
debug("Send Vote to %s", peer.Name())
resp, err := Post(&t, fmt.Sprintf("%s/vote", peer.Name()), &b)
if resp != nil {
@@ -47,9 +51,9 @@ func (t transHandler) SendVoteRequest(server *raft.Server, peer *raft.Peer, req
if err = json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF {
return rvrsp, nil
}
}
return rvrsp, fmt.Errorf("raftd: Unable to request vote: %v", err)
return rvrsp, fmt.Errorf("Unable to request vote: %v", err)
}
// Sends SnapshotRequest RPCs to a peer when the server is the candidate.
@@ -58,7 +62,8 @@ func (t transHandler) SendSnapshotRequest(server *raft.Server, peer *raft.Peer,
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
debug("[send] POST %s/snapshot [%d %d]", peer.Name(), req.LastTerm, req.LastIndex)
debug("Send Snapshot to %s [Last Term: %d, LastIndex %d]", peer.Name(),
req.LastTerm, req.LastIndex)
resp, err := Post(&t, fmt.Sprintf("%s/snapshot", peer.Name()), &b)
@@ -70,5 +75,5 @@ func (t transHandler) SendSnapshotRequest(server *raft.Server, peer *raft.Peer,
return aersp, nil
}
}
return aersp, fmt.Errorf("raftd: Unable to send snapshot: %v", err)
}
return aersp, fmt.Errorf("Unable to send snapshot: %v", err)
}

View File

@@ -1,17 +1,17 @@
package web
import (
"code.google.com/p/go.net/websocket"
)
type connection struct {
// The websocket connection.
ws *websocket.Conn
// Buffered channel of outbound messages.
send chan string
}
func (c *connection) writer() {
for message := range c.send {
err := websocket.Message.Send(c.ws, message)
@@ -21,10 +21,10 @@ func (c *connection) writer() {
}
c.ws.Close()
}
func wsHandler(ws *websocket.Conn) {
c := &connection{send: make(chan string, 256), ws: ws}
h.register <- c
defer func() { h.unregister <- c }()
c.writer()
}
}

View File

@@ -1,61 +1,61 @@
package web
type hub struct {
// status
open bool
// status
open bool
// Registered connections.
connections map[*connection]bool
// Registered connections.
connections map[*connection]bool
// Inbound messages from the connections.
broadcast chan string
// Inbound messages from the connections.
broadcast chan string
// Register requests from the connections.
register chan *connection
// Register requests from the connections.
register chan *connection
// Unregister requests from connections.
unregister chan *connection
// Unregister requests from connections.
unregister chan *connection
}
var h = hub{
open: false,
broadcast: make(chan string),
register: make(chan *connection),
unregister: make(chan *connection),
connections: make(map[*connection]bool),
open: false,
broadcast: make(chan string),
register: make(chan *connection),
unregister: make(chan *connection),
connections: make(map[*connection]bool),
}
func Hub() *hub{
return &h
func Hub() *hub {
return &h
}
func HubOpen() bool {
return h.open
return h.open
}
func (h *hub) run() {
h.open = true
for {
select {
case c := <-h.register:
h.connections[c] = true
case c := <-h.unregister:
delete(h.connections, c)
close(c.send)
case m := <-h.broadcast:
for c := range h.connections {
select {
case c.send <- m:
default:
delete(h.connections, c)
close(c.send)
go c.ws.Close()
}
}
}
}
h.open = true
for {
select {
case c := <-h.register:
h.connections[c] = true
case c := <-h.unregister:
delete(h.connections, c)
close(c.send)
case m := <-h.broadcast:
for c := range h.connections {
select {
case c.send <- m:
default:
delete(h.connections, c)
close(c.send)
go c.ws.Close()
}
}
}
}
}
func (h *hub) Send(msg string) {
h.broadcast <- msg
}
h.broadcast <- msg
}

View File

@@ -1,42 +1,41 @@
package web
import (
"fmt"
"net/http"
"github.com/xiangli-cmu/raft-etcd/store"
"github.com/benbjohnson/go-raft"
"time"
"code.google.com/p/go.net/websocket"
"html/template"
"code.google.com/p/go.net/websocket"
"fmt"
"github.com/benbjohnson/go-raft"
"github.com/xiangli-cmu/raft-etcd/store"
"html/template"
"net/http"
"time"
)
var s *raft.Server
type MainPage struct {
Leader string
Address string
Leader string
Address string
}
func handler(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Leader:\n%s\n", s.Leader())
fmt.Fprintf(w, "Peers:\n")
fmt.Fprintf(w, "Leader:\n%s\n", s.Leader())
fmt.Fprintf(w, "Peers:\n")
for peerName, _ := range s.Peers() {
fmt.Fprintf(w, "%s\n", peerName)
}
for peerName, _ := range s.Peers() {
fmt.Fprintf(w, "%s\n", peerName)
}
fmt.Fprintf(w, "Data\n")
fmt.Fprintf(w, "Data\n")
s := store.GetStore()
s := store.GetStore()
for key, node := range s.Nodes {
if node.ExpireTime.Equal(time.Unix(0,0)) {
fmt.Fprintf(w, "%s %s\n", key, node.Value)
} else {
fmt.Fprintf(w, "%s %s %s\n", key, node.Value, node.ExpireTime)
}
}
for key, node := range s.Nodes {
if node.ExpireTime.Equal(time.Unix(0, 0)) {
fmt.Fprintf(w, "%s %s\n", key, node.Value)
} else {
fmt.Fprintf(w, "%s %s %s\n", key, node.Value, node.ExpireTime)
}
}
}
@@ -44,24 +43,20 @@ var mainTempl = template.Must(template.ParseFiles("home.html"))
func mainHandler(c http.ResponseWriter, req *http.Request) {
p := &MainPage{Leader: s.Leader(),
Address: s.Name(),}
p := &MainPage{Leader: s.Leader(),
Address: s.Name()}
mainTempl.Execute(c, p)
mainTempl.Execute(c, p)
}
func Start(server *raft.Server, port int) {
s = server
go h.run()
http.HandleFunc("/", mainHandler)
http.Handle("/ws", websocket.Handler(wsHandler))
go h.run()
http.HandleFunc("/", mainHandler)
http.Handle("/ws", websocket.Handler(wsHandler))
//http.HandleFunc("/", handler)
fmt.Println("web listening at port ", port)
http.ListenAndServe(fmt.Sprintf(":%v", port), nil)
//http.HandleFunc("/", handler)
fmt.Println("web listening at port ", port)
http.ListenAndServe(fmt.Sprintf(":%v", port), nil)
}