make store system communicate with raft

This commit is contained in:
Xiang Li
2013-06-09 10:42:34 -07:00
parent e9ac8b1b98
commit 69a8116272
8 changed files with 675 additions and 15 deletions

135
command.go Normal file
View File

@@ -0,0 +1,135 @@
package main
//------------------------------------------------------------------------------
//
// Commands
//
//------------------------------------------------------------------------------
import (
"github.com/benbjohnson/go-raft"
"encoding/json"
)
// A command represents an action to be taken on the replicated state machine.
type Command interface {
CommandName() string
Apply(server *raft.Server) ([]byte, error)
GeneratePath() string
Type() string
GetValue() string
GetKey() string
}
// Set command
type SetCommand struct {
Key string `json:"key"`
Value string `json:"value"`
}
// The name of the command in the log
func (c *SetCommand) CommandName() string {
return "set"
}
// Set the value of key to value
func (c *SetCommand) Apply(server *raft.Server) ([]byte, error) {
res := s.Set(c.Key, c.Value)
return json.Marshal(res)
}
func (c *SetCommand) GeneratePath() string{
return "/set/" + c.Key
}
func (c *SetCommand) Type() string{
return "POST"
}
func (c *SetCommand) GetValue() string{
return c.Value
}
func (c *SetCommand) GetKey() string{
return c.Key
}
// Get command
type GetCommand struct {
Key string `json:"key"`
}
// The name of the command in the log
func (c *GetCommand) CommandName() string {
return "get"
}
// Set the value of key to value
func (c *GetCommand) Apply(server *raft.Server) ([]byte, error){
res := s.Get(c.Key)
return json.Marshal(res)
}
func (c *GetCommand) GeneratePath() string{
return "/get/" + c.Key
}
func (c *GetCommand) Type() string{
return "GET"
}
func (c *GetCommand) GetValue() string{
return ""
}
func (c *GetCommand) GetKey() string{
return c.Key
}
// Delete command
type DeleteCommand struct {
Key string `json:"key"`
}
// The name of the command in the log
func (c *DeleteCommand) CommandName() string {
return "delete"
}
// Set the value of key to value
func (c *DeleteCommand) Apply(server *raft.Server) ([]byte, error){
res := s.Delete(c.Key)
return json.Marshal(res)
}
func (c *DeleteCommand) GeneratePath() string{
return "/delete/" + c.Key
}
func (c *DeleteCommand) Type() string{
return "GET"
}
func (c *DeleteCommand) GetValue() string{
return ""
}
func (c *DeleteCommand) GetKey() string{
return c.Key
}
// joinCommand
type joinCommand struct {
Name string `json:"name"`
}
func (c *joinCommand) CommandName() string {
return "join"
}
func (c *joinCommand) Apply(server *raft.Server) ([]byte, error) {
err := server.AddPeer(c.Name)
return nil, err
}

190
handlers.go Normal file
View File

@@ -0,0 +1,190 @@
package main
import (
"github.com/benbjohnson/go-raft"
"net/http"
"encoding/json"
"fmt"
"github.com/gorilla/mux"
"io/ioutil"
"bytes"
)
//--------------------------------------
// HTTP Handlers
//--------------------------------------
func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
debug("[recv] GET http://%v/log", server.Name())
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(server.LogEntries())
}
func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
debug("[recv] POST http://%v/join", server.Name())
command := &joinCommand{}
if err := decodeJsonRequest(req, command); err == nil {
if _, err= server.Do(command); err != nil {
warn("raftd: Unable to join: %v", err)
w.WriteHeader(http.StatusInternalServerError)
} else {
w.WriteHeader(http.StatusOK)
}
} else {
warn("[join] ERROR: %v", err)
w.WriteHeader(http.StatusInternalServerError)
}
}
func VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
rvreq := &raft.RequestVoteRequest{}
err := decodeJsonRequest(req, rvreq)
if err == nil {
debug("[recv] POST http://%v/vote [%s]", server.Name(), rvreq.CandidateName)
if resp, _ := server.RequestVote(rvreq); resp != nil {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
return
}
}
w.WriteHeader(http.StatusInternalServerError)
}
func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
aereq := &raft.AppendEntriesRequest{}
err := decodeJsonRequest(req, aereq)
if err == nil {
debug("[recv] POST http://%s/log/append [%d]", server.Name(), len(aereq.Entries))
if resp, _ := server.AppendEntries(aereq); resp != nil {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
if !resp.Success {
fmt.Println("append error")
}
return
}
}
warn("[append] ERROR: %v", err)
w.WriteHeader(http.StatusInternalServerError)
}
func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
aereq := &raft.SnapshotRequest{}
err := decodeJsonRequest(req, aereq)
if err == nil {
debug("[recv] POST http://%s/snapshot/ ", server.Name())
if resp, _ := server.SnapshotRecovery(aereq); resp != nil {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
return
}
}
warn("[snapshot] ERROR: %v", err)
w.WriteHeader(http.StatusInternalServerError)
}
func SetHttpHandler(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
debug("[recv] POST http://%v/set/%s", server.Name(), vars["key"])
content, err := ioutil.ReadAll(req.Body)
if err != nil {
warn("raftd: Unable to read: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
command := &SetCommand{}
command.Key = vars["key"]
command.Value = string(content)
Dispatch(server, command, w)
}
func GetHttpHandler(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
debug("[recv] GET http://%v/get/%s", server.Name(), vars["key"])
command := &GetCommand{}
command.Key = vars["key"]
Dispatch(server, command, w)
}
func Dispatch(server *raft.Server, command Command, w http.ResponseWriter) {
var body []byte
var err error
// unlikely to fail twice
for {
// i am the leader, i will take care of the command
if server.State() == "leader" {
if body, err = server.Do(command); err != nil {
warn("raftd: Unable to write file: %v", err)
w.WriteHeader(http.StatusInternalServerError)
} else {
// good to go
w.WriteHeader(http.StatusOK)
w.Write(body)
return
}
// redirect the command to the current leader
} else {
leaderName := server.Leader()
if leaderName =="" {
// no luckey, during the voting process
continue
}
path := command.GeneratePath()
if command.Type() == "POST" {
debug("[send] POST http://%v/%s", leaderName, path)
reader := bytes.NewReader([]byte(command.GetValue()))
reps, _ := http.Post(fmt.Sprintf("http://%v/%s",
leaderName, command.GeneratePath()), "application/json", reader)
reps.Body.Read(body)
// good to go
w.WriteHeader(http.StatusOK)
w.Write(body)
} else if command.Type() == "GET" {
debug("[send] GET http://%v/%s", leaderName, path)
reps, _ := http.Get(fmt.Sprintf("http://%v/%s",
leaderName, command.GeneratePath()))
// good to go
reps.Body.Read(body)
w.WriteHeader(http.StatusOK)
w.Write(body)
} else {
//unsupported type
}
if err != nil {
// should check other errors
continue
} else {
//good to go
return
}
}
}
}

259
raftd.go Normal file
View File

@@ -0,0 +1,259 @@
package main
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"github.com/benbjohnson/go-raft"
"github.com/gorilla/mux"
"log"
"io"
"io/ioutil"
"net/http"
"strings"
"os"
"time"
)
//------------------------------------------------------------------------------
//
// Initialization
//
//------------------------------------------------------------------------------
var verbose bool
func init() {
flag.BoolVar(&verbose, "v", false, "verbose logging")
flag.BoolVar(&verbose, "verbose", false, "verbose logging")
}
//------------------------------------------------------------------------------
//
// Typedefs
//
//------------------------------------------------------------------------------
type Info struct {
Host string `json:"host"`
Port int `json:"port"`
}
//------------------------------------------------------------------------------
//
// Variables
//
//------------------------------------------------------------------------------
var server *raft.Server
var logger *log.Logger
//------------------------------------------------------------------------------
//
// Functions
//
//------------------------------------------------------------------------------
//--------------------------------------
// Main
//--------------------------------------
func main() {
var err error
logger = log.New(os.Stdout, "", log.LstdFlags)
flag.Parse()
if verbose {
fmt.Println("Verbose logging enabled.\n")
}
// Setup commands.
raft.RegisterCommand(&joinCommand{})
raft.RegisterCommand(&SetCommand{})
raft.RegisterCommand(&GetCommand{})
raft.RegisterCommand(&DeleteCommand{})
// Use the present working directory if a directory was not passed in.
var path string
if flag.NArg() == 0 {
path, _ = os.Getwd()
} else {
path = flag.Arg(0)
if err := os.MkdirAll(path, 0744); err != nil {
fatal("Unable to create path: %v", err)
}
}
// Read server info from file or grab it from user.
var info *Info = getInfo(path)
name := fmt.Sprintf("%s:%d", info.Host, info.Port)
fmt.Printf("Name: %s\n\n", name)
t := transHandler{}
// Setup new raft server.
server, err = raft.NewServer(name, path, t, nil)
//server.DoHandler = DoHandler;
server.SetElectionTimeout(2 * time.Second)
server.SetHeartbeatTimeout(1 * time.Second)
if err != nil {
fatal("%v", err)
}
server.Start()
// Join to another server if we don't have a log.
if server.IsLogEmpty() {
var leaderHost string
fmt.Println("This server has no log. Please enter a server in the cluster to join\nto or hit enter to initialize a cluster.");
fmt.Printf("Join to (host:port)> ");
fmt.Scanf("%s", &leaderHost)
if leaderHost == "" {
server.Initialize()
} else {
join(server)
fmt.Println("success join")
}
}
// open snapshot
//go server.Snapshot()
// Create HTTP interface.
r := mux.NewRouter()
// internal commands
r.HandleFunc("/join", JoinHttpHandler).Methods("POST")
r.HandleFunc("/vote", VoteHttpHandler).Methods("POST")
r.HandleFunc("/log", GetLogHttpHandler).Methods("GET")
r.HandleFunc("/log/append", AppendEntriesHttpHandler).Methods("POST")
r.HandleFunc("/snapshot", SnapshotHttpHandler).Methods("POST")
// external commands
r.HandleFunc("/set/{key}", SetHttpHandler).Methods("POST")
r.HandleFunc("/get/{key}", GetHttpHandler).Methods("GET")
//r.HandleFunc("/delete/{key}", DeleteHttpHandler).Methods("GET")
http.Handle("/", r)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.Port), nil))
}
func usage() {
fatal("usage: raftd [PATH]")
}
//--------------------------------------
// Config
//--------------------------------------
func getInfo(path string) *Info {
info := &Info{}
// Read in the server info if available.
infoPath := fmt.Sprintf("%s/info", path)
if file, err := os.Open(infoPath); err == nil {
if content, err := ioutil.ReadAll(file); err != nil {
fatal("Unable to read info: %v", err)
} else {
if err = json.Unmarshal(content, &info); err != nil {
fatal("Unable to parse info: %v", err)
}
}
file.Close()
// Otherwise ask user for info and write it to file.
} else {
fmt.Printf("Enter hostname: [localhost] ");
fmt.Scanf("%s", &info.Host)
info.Host = strings.TrimSpace(info.Host)
if info.Host == "" {
info.Host = "localhost"
}
fmt.Printf("Enter port: [4001] ");
fmt.Scanf("%d", &info.Port)
if info.Port == 0 {
info.Port = 4001
}
// Write to file.
content, _ := json.Marshal(info)
content = []byte(string(content) + "\n")
if err := ioutil.WriteFile(infoPath, content, 0644); err != nil {
fatal("Unable to write info to file: %v", err)
}
}
return info
}
//--------------------------------------
// Handlers
//--------------------------------------
// Send join requests to the leader.
func join(s *raft.Server) error {
var b bytes.Buffer
command := &joinCommand{}
command.Name = s.Name()
json.NewEncoder(&b).Encode(command)
debug("[send] POST http://%v/join", "localhost:4001")
resp, err := http.Post(fmt.Sprintf("http://%s/join", "localhost:4001"), "application/json", &b)
if resp != nil {
resp.Body.Close()
if resp.StatusCode == http.StatusOK {
return nil
}
}
return fmt.Errorf("raftd: Unable to join: %v", err)
}
//--------------------------------------
// HTTP Utilities
//--------------------------------------
func decodeJsonRequest(req *http.Request, data interface{}) error {
decoder := json.NewDecoder(req.Body)
if err := decoder.Decode(&data); err != nil && err != io.EOF {
logger.Println("Malformed json request: %v", err)
return fmt.Errorf("Malformed json request: %v", err)
}
return nil
}
func encodeJsonResponse(w http.ResponseWriter, status int, data interface{}) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
if data != nil {
encoder := json.NewEncoder(w)
encoder.Encode(data)
}
}
//--------------------------------------
// Log
//--------------------------------------
func debug(msg string, v ...interface{}) {
if verbose {
logger.Printf("DEBUG " + msg + "\n", v...)
}
}
func info(msg string, v ...interface{}) {
logger.Printf("INFO " + msg + "\n", v...)
}
func warn(msg string, v ...interface{}) {
logger.Printf("WARN " + msg + "\n", v...)
}
func fatal(msg string, v ...interface{}) {
logger.Printf("FATAL " + msg + "\n", v...)
os.Exit(1)
}

View File

@@ -1,9 +1,9 @@
package raftd
package main
import (
"path"
"errors"
"encoding/json"
"fmt"
)
// CONSTANTS
@@ -17,6 +17,12 @@ type Store struct {
Nodes map[string]string `json:"nodes"`
}
type Response struct {
OldValue string `json:oldvalue`
Exist bool `json:exist`
}
// global store
var s *Store
@@ -32,8 +38,8 @@ func createStore() *Store{
}
// set the key to value, return the old value if the key exists
func (s *Store) Set(key string, value string) (string, bool) {
func (s *Store) Set(key string, value string) Response {
fmt.Println("Store SET")
key = path.Clean(key)
oldValue, ok := s.Nodes[key]
@@ -41,30 +47,31 @@ func (s *Store) Set(key string, value string) (string, bool) {
if ok {
s.Nodes[key] = value
w.notify(SET, key, oldValue, value)
return oldValue, true
return Response{oldValue, true}
} else {
s.Nodes[key] = value
w.notify(SET, key, "", value)
return "", false
return Response{"", false}
}
}
// get the value of the key
func (s *Store) Get(key string) (string, error) {
func (s *Store) Get(key string) Response {
fmt.Println("Stroe Get")
key = path.Clean(key)
value, ok := s.Nodes[key]
if ok {
return value, nil
return Response{value, true}
} else {
return "", errors.New("Key does not exist")
return Response{"", false}
}
}
// delete the key, return the old value if the key exists
func (s *Store) Delete(key string) (string, error) {
func (s *Store) Delete(key string) Response {
key = path.Clean(key)
oldValue, ok := s.Nodes[key]
@@ -74,9 +81,9 @@ func (s *Store) Delete(key string) (string, error) {
w.notify(DELETE, key, oldValue, "")
return oldValue, nil
return Response{oldValue, true}
} else {
return "", errors.New("Key does not exist")
return Response{"", false}
}
}

View File

@@ -1,4 +1,4 @@
package raftd
package main
import (
"testing"

69
trans_handler.go Normal file
View File

@@ -0,0 +1,69 @@
package main
import(
"encoding/json"
"github.com/benbjohnson/go-raft"
"bytes"
"net/http"
"fmt"
"io"
)
type transHandler struct {
name string
}
// Sends AppendEntries RPCs to a peer when the server is the leader.
func (t transHandler) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) (*raft.AppendEntriesResponse, error) {
var aersp *raft.AppendEntriesResponse
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
debug("[send] POST http://%s/log/append [%d]", peer.Name(), len(req.Entries))
resp, err := http.Post(fmt.Sprintf("http://%s/log/append", peer.Name()), "application/json", &b)
if resp != nil {
defer resp.Body.Close()
aersp = &raft.AppendEntriesResponse{}
if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
return aersp, nil
}
}
return aersp, fmt.Errorf("raftd: Unable to append entries: %v", err)
}
// Sends RequestVote RPCs to a peer when the server is the candidate.
func (t transHandler) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) (*raft.RequestVoteResponse, error) {
var rvrsp *raft.RequestVoteResponse
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
debug("[send] POST http://%s/vote", peer.Name())
resp, err := http.Post(fmt.Sprintf("http://%s/vote", peer.Name()), "application/json", &b)
if resp != nil {
defer resp.Body.Close()
rvrsp := &raft.RequestVoteResponse{}
if err = json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF {
return rvrsp, nil
}
}
return rvrsp, fmt.Errorf("raftd: Unable to request vote: %v", err)
}
// Sends SnapshotRequest RPCs to a peer when the server is the candidate.
func (t transHandler) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) (*raft.SnapshotResponse, error) {
var aersp *raft.SnapshotResponse
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
debug("[send] POST http://%s/snapshot [%d %d]", peer.Name(), req.LastTerm, req.LastIndex)
resp, err := http.Post(fmt.Sprintf("http://%s/snapshot", peer.Name()), "application/json", &b)
if resp != nil {
defer resp.Body.Close()
aersp = &raft.SnapshotResponse{}
if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
return aersp, nil
}
}
fmt.Println("error send snapshot")
return aersp, fmt.Errorf("raftd: Unable to send snapshot: %v", err)
}

View File

@@ -1,4 +1,4 @@
package raftd
package main
import (
"path"

View File

@@ -1,4 +1,4 @@
package raftd
package main
import (
"testing"