2015-12-17 14:41:37 -08:00

235 lines
6.1 KiB
Go

// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"fmt"
"log"
"os"
"strconv"
"time"
"net/http"
"net/url"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
)
// A key-value stream backed by raft
type raftNode struct {
proposeC <-chan string // proposed messages (k,v)
commitC chan *string // entries committed to log (k,v)
errorC chan error // errors from raft session
id int // client ID for raft session
peers []string // raft peer URLs
waldir string // path to WAL directory
// raft backing for the commit/error channel
node raft.Node
raftStorage *raft.MemoryStorage
wal *wal.WAL
transport *rafthttp.Transport
}
// newRaftNode initiates a raft instance and returns a committed log entry
// channel and error channel. Proposals for log updates are sent over the
// provided the proposal channel. All log entries are replayed over the
// commit channel, followed by a nil message (to indicate the channel is
// current), then new log entries.
func newRaftNode(id int, peers []string, proposeC <-chan string) (<-chan *string, <-chan error) {
rc := &raftNode{
proposeC: proposeC,
commitC: make(chan *string),
errorC: make(chan error),
id: id,
peers: peers,
waldir: fmt.Sprintf("raftexample-%d", id),
raftStorage: raft.NewMemoryStorage(),
// rest of structure populated after WAL replay
}
go rc.startRaft()
return rc.commitC, rc.errorC
}
// publishEntries writes committed log entries to commit channel.
func (rc *raftNode) publishEntries(ents []raftpb.Entry) {
for i := range ents {
if ents[i].Type != raftpb.EntryNormal || len(ents[i].Data) == 0 {
// ignore conf changes and empty messages
continue
}
s := string(ents[i].Data)
rc.commitC <- &s
}
}
// openWAL returns a WAL ready for reading.
func (rc *raftNode) openWAL() *wal.WAL {
if wal.Exist(rc.waldir) == false {
if err := os.Mkdir(rc.waldir, 0750); err != nil {
log.Fatalf("raftexample: cannot create dir for wal (%v)", err)
}
w, err := wal.Create(rc.waldir, nil)
if err != nil {
log.Fatalf("raftexample: create wal error (%v)", err)
}
w.Close()
}
w, err := wal.Open(rc.waldir, walpb.Snapshot{})
if err != nil {
log.Fatalf("raftexample: error loading wal (%v)", err)
}
return w
}
// replayWAL replays WAL entries into the raft instance and the commit
// channel and returns an appendable WAL.
func (rc *raftNode) replayWAL() *wal.WAL {
w := rc.openWAL()
_, _, ents, err := w.ReadAll()
if err != nil {
log.Fatalf("raftexample: failed to read WAL (%v)", err)
}
// append to storage so raft starts at the right place in log
rc.raftStorage.Append(ents)
rc.publishEntries(ents)
// send nil value so client knows commit channel is current
rc.commitC <- nil
return w
}
func (rc *raftNode) writeError(err error) {
rc.errorC <- err
rc.stop()
}
func (rc *raftNode) stop() {
close(rc.commitC)
close(rc.errorC)
rc.node.Stop()
}
func (rc *raftNode) startRaft() {
oldwal := wal.Exist(rc.waldir)
rc.wal = rc.replayWAL()
rpeers := make([]raft.Peer, len(rc.peers))
for i := range rpeers {
rpeers[i] = raft.Peer{ID: uint64(i + 1)}
}
c := &raft.Config{
ID: uint64(rc.id),
ElectionTick: 10,
HeartbeatTick: 1,
Storage: rc.raftStorage,
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
}
if oldwal {
rc.node = raft.RestartNode(c)
} else {
rc.node = raft.StartNode(c, rpeers)
}
ss := &stats.ServerStats{}
ss.Initialize()
rc.transport = &rafthttp.Transport{
ID: types.ID(rc.id),
ClusterID: 0x1000,
Raft: rc,
ServerStats: ss,
LeaderStats: stats.NewLeaderStats(strconv.Itoa(rc.id)),
ErrorC: make(chan error),
}
rc.transport.Start()
for i := range rc.peers {
if i+1 != rc.id {
rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
}
}
go rc.serveRaft()
go rc.serveChannels()
}
func (rc *raftNode) serveChannels() {
defer rc.wal.Close()
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
// event loop on client proposals and raft updates
for {
select {
case <-ticker.C:
rc.node.Tick()
// send proposals over raft
case prop, ok := <-rc.proposeC:
if !ok {
// client closed channel; shut down
rc.stop()
return
}
rc.node.Propose(context.TODO(), []byte(prop))
// store raft entries to wal, then publish over commit channel
case rd := <-rc.node.Ready():
rc.wal.Save(rd.HardState, rd.Entries)
rc.raftStorage.Append(rd.Entries)
rc.transport.Send(rd.Messages)
rc.publishEntries(rd.Entries)
rc.node.Advance()
case err := <-rc.transport.ErrorC:
rc.writeError(err)
return
}
}
}
func (rc *raftNode) serveRaft() {
url, err := url.Parse(rc.peers[rc.id-1])
if err != nil {
log.Fatalf("raftexample: Failed parsing URL (%v)", err)
}
srv := http.Server{Addr: url.Host, Handler: rc.transport.Handler()}
if err := srv.ListenAndServe(); err != nil {
log.Fatalf("raftexample: Failed serving rafthttp (%v)", err)
}
}
func (rc *raftNode) Process(ctx context.Context, m raftpb.Message) error {
return rc.node.Step(ctx, m)
}
func (rc *raftNode) IsIDRemoved(id uint64) bool { return false }
func (rc *raftNode) ReportUnreachable(id uint64) {}
func (rc *raftNode) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}