*: raft related int64 -> uint64

This commit is contained in:
Xiang Li 2014-10-08 19:58:53 +08:00
parent af5b8c6c44
commit 8bbbaa88b2
18 changed files with 88 additions and 98 deletions

View File

@ -29,12 +29,12 @@ type Discoverer interface {
type discovery struct {
cluster string
id int64
id uint64
config string
c client.Client
}
func New(durl string, id int64, config string) (Discoverer, error) {
func New(durl string, id uint64, config string) (Discoverer, error) {
u, err := url.Parse(durl)
if err != nil {
return nil, err

View File

@ -12,9 +12,9 @@ import (
)
// Cluster is a list of Members that belong to the same raft cluster
type Cluster map[int64]*Member
type Cluster map[uint64]*Member
func (c Cluster) FindID(id int64) *Member {
func (c Cluster) FindID(id uint64) *Member {
return c[id]
}
@ -49,7 +49,7 @@ func (c *Cluster) AddSlice(mems []Member) error {
// Pick chooses a random address from a given Member's addresses, and returns it as
// an addressible URI. If the given member does not exist, an empty string is returned.
func (c Cluster) Pick(id int64) string {
func (c Cluster) Pick(id uint64) string {
if m := c.FindID(id); m != nil {
urls := m.PeerURLs
if len(urls) == 0 {
@ -95,12 +95,12 @@ func (c Cluster) String() string {
return strings.Join(sl, ",")
}
func (c Cluster) IDs() []int64 {
var ids []int64
func (c Cluster) IDs() []uint64 {
var ids []uint64
for _, m := range c {
ids = append(ids, m.ID)
}
sort.Sort(types.Int64Slice(ids))
sort.Sort(types.Uint64Slice(ids))
return ids
}

View File

@ -18,7 +18,7 @@ const (
type ClusterStore interface {
Add(m Member)
Get() Cluster
Remove(id int64)
Remove(id uint64)
}
type clusterStore struct {
@ -69,7 +69,7 @@ func (s *clusterStore) Get() Cluster {
// Remove removes a member from the store.
// The given id MUST exist.
func (s *clusterStore) Remove(id int64) {
func (s *clusterStore) Remove(id uint64) {
p := s.Get().FindID(id).storeKey()
if _, err := s.Store.Delete(p, false, false); err != nil {
log.Panicf("delete peer should never fail: %v", err)

View File

@ -90,7 +90,7 @@ func TestClusterPick(t *testing.T) {
func TestClusterFind(t *testing.T) {
tests := []struct {
id int64
id uint64
name string
mems []Member
match bool
@ -207,7 +207,7 @@ func TestClusterIDs(t *testing.T) {
{ID: 4},
{ID: 100},
})
w := []int64{1, 4, 100}
w := []uint64{1, 4, 100}
g := cs.IDs()
if !reflect.DeepEqual(w, g) {
t.Errorf("IDs=%+v, want %+v", g, w)

View File

@ -13,7 +13,7 @@ type ServerConfig struct {
DiscoveryURL string
ClientURLs types.URLs
DataDir string
SnapCount int64
SnapCount uint64
Cluster *Cluster
ClusterState ClusterState
Transport *http.Transport

View File

@ -79,7 +79,7 @@ func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
defer cancel()
rr, err := parseRequest(r, etcdserver.GenID())
rr, err := parseRequest(r, int64(etcdserver.GenID()))
if err != nil {
writeError(w, err)
return

View File

@ -501,8 +501,8 @@ func TestWriteError(t *testing.T) {
type dummyRaftTimer struct{}
func (drt dummyRaftTimer) Index() int64 { return int64(100) }
func (drt dummyRaftTimer) Term() int64 { return int64(5) }
func (drt dummyRaftTimer) Index() uint64 { return uint64(100) }
func (drt dummyRaftTimer) Term() uint64 { return uint64(5) }
func TestWriteEvent(t *testing.T) {
// nil event should not panic
@ -1246,4 +1246,4 @@ func (c *fakeCluster) Get() etcdserver.Cluster {
return *cl
}
func (c *fakeCluster) Remove(id int64) { return }
func (c *fakeCluster) Remove(id uint64) { return }

View File

@ -14,7 +14,7 @@ import (
const machineKVPrefix = "/_etcd/machines/"
type Member struct {
ID int64
ID uint64
Name string
// TODO(philips): ensure these are URLs
PeerURLs []string
@ -36,14 +36,10 @@ func newMember(name string, peerURLs types.URLs, now *time.Time) *Member {
}
hash := sha1.Sum(b)
m.ID = int64(binary.BigEndian.Uint64(hash[:8]))
if m.ID < 0 {
m.ID = m.ID * -1
}
m.ID = binary.BigEndian.Uint64(hash[:8])
return m
}
func (m Member) storeKey() string {
return path.Join(machineKVPrefix, strconv.FormatUint(uint64(m.ID), 16))
return path.Join(machineKVPrefix, strconv.FormatUint(m.ID, 16))
}

View File

@ -17,9 +17,9 @@ func timeParse(value string) *time.Time {
func TestMemberTime(t *testing.T) {
tests := []struct {
mem *Member
id int64
id uint64
}{
{newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.8:2379"}}, nil), 7206348984215161146},
{newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.8:2379"}}, nil), 11240395089494390470},
{newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}}, timeParse("1984-12-23T15:04:05Z")), 5483967913615174889},
}
for i, tt := range tests {

View File

@ -79,8 +79,8 @@ type Server interface {
}
type RaftTimer interface {
Index() int64
Term() int64
Index() uint64
Term() uint64
}
// NewServer creates a new EtcdServer from the supplied configuration. The
@ -125,7 +125,7 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
if cfg.DiscoveryURL != "" {
log.Printf("etcd: warn: ignoring discovery URL: etcd has already been initialized and has a valid log in %q", waldir)
}
var index int64
var index uint64
snapshot, err := ss.Load()
if err != nil && err != snap.ErrNoSnapshot {
log.Fatal(err)
@ -194,11 +194,11 @@ type EtcdServer struct {
ticker <-chan time.Time
syncTicker <-chan time.Time
snapCount int64 // number of entries to trigger a snapshot
snapCount uint64 // number of entries to trigger a snapshot
// Cache of the latest raft index and raft term the server has seen
raftIndex int64
raftTerm int64
raftIndex uint64
raftTerm uint64
}
// Start prepares and starts server in a new goroutine. It is no longer safe to
@ -231,8 +231,8 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
func (s *EtcdServer) run() {
var syncC <-chan time.Time
// snapi indicates the index of the last submitted snapshot request
var snapi, appliedi int64
var nodes []int64
var snapi, appliedi uint64
var nodes []uint64
for {
select {
case <-s.ticker:
@ -260,12 +260,12 @@ func (s *EtcdServer) run() {
panic("TODO: this is bad, what do we do about it?")
}
s.applyConfChange(cc)
s.w.Trigger(cc.ID, nil)
s.w.Trigger(int64(cc.ID), nil)
default:
panic("unexpected entry type")
}
atomic.StoreInt64(&s.raftIndex, e.Index)
atomic.StoreInt64(&s.raftTerm, e.Term)
atomic.StoreUint64(&s.raftIndex, e.Index)
atomic.StoreUint64(&s.raftTerm, e.Term)
appliedi = e.Index
}
@ -378,7 +378,7 @@ func (s *EtcdServer) AddMember(ctx context.Context, memb Member) error {
return s.configure(ctx, cc)
}
func (s *EtcdServer) RemoveMember(ctx context.Context, id int64) error {
func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) error {
cc := raftpb.ConfChange{
ID: GenID(),
Type: raftpb.ConfChangeRemoveNode,
@ -388,28 +388,28 @@ func (s *EtcdServer) RemoveMember(ctx context.Context, id int64) error {
}
// Implement the RaftTimer interface
func (s *EtcdServer) Index() int64 {
return atomic.LoadInt64(&s.raftIndex)
func (s *EtcdServer) Index() uint64 {
return atomic.LoadUint64(&s.raftIndex)
}
func (s *EtcdServer) Term() int64 {
return atomic.LoadInt64(&s.raftTerm)
func (s *EtcdServer) Term() uint64 {
return atomic.LoadUint64(&s.raftTerm)
}
// configure sends configuration change through consensus then performs it.
// It will block until the change is performed or there is an error.
func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error {
ch := s.w.Register(cc.ID)
ch := s.w.Register(int64(cc.ID))
if err := s.node.ProposeConfChange(ctx, cc); err != nil {
log.Printf("configure error: %v", err)
s.w.Trigger(cc.ID, nil)
s.w.Trigger(int64(cc.ID), nil)
return err
}
select {
case <-ch:
return nil
case <-ctx.Done():
s.w.Trigger(cc.ID, nil) // GC wait
s.w.Trigger(int64(cc.ID), nil) // GC wait
return ctx.Err()
case <-s.done:
return ErrStopped
@ -423,7 +423,7 @@ func (s *EtcdServer) sync(timeout time.Duration) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
req := pb.Request{
Method: "SYNC",
ID: GenID(),
ID: int64(GenID()),
Time: time.Now().UnixNano(),
}
data, err := req.Marshal()
@ -454,7 +454,7 @@ func (s *EtcdServer) publish(retryInterval time.Duration) {
return
}
req := pb.Request{
ID: GenID(),
ID: int64(GenID()),
Method: "PUT",
Path: m.storeKey(),
Val: string(b),
@ -554,7 +554,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) {
}
// TODO: non-blocking snapshot
func (s *EtcdServer) snapshot(snapi int64, snapnodes []int64) {
func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) {
d, err := s.store.Save()
// TODO: current store will never fail to do a snapshot
// what should we do if the store might fail?
@ -567,9 +567,9 @@ func (s *EtcdServer) snapshot(snapi int64, snapnodes []int64) {
// TODO: move the function to /id pkg maybe?
// GenID generates a random id that is not equal to 0.
func GenID() (n int64) {
func GenID() (n uint64) {
for n == 0 {
n = rand.Int63()
n = uint64(rand.Int63())
}
return
}

View File

@ -371,7 +371,7 @@ func TestApplyRequest(t *testing.T) {
func TestClusterOf1(t *testing.T) { testServer(t, 1) }
func TestClusterOf3(t *testing.T) { testServer(t, 3) }
func testServer(t *testing.T, ns int64) {
func testServer(t *testing.T, ns uint64) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -384,12 +384,12 @@ func testServer(t *testing.T, ns int64) {
}
}
members := make([]int64, ns)
for i := int64(0); i < ns; i++ {
members := make([]uint64, ns)
for i := uint64(0); i < ns; i++ {
members[i] = i + 1
}
for i := int64(0); i < ns; i++ {
for i := uint64(0); i < ns; i++ {
id := i + 1
n := raft.StartNode(id, members, 10, 1)
tk := time.NewTicker(10 * time.Millisecond)
@ -458,7 +458,7 @@ func TestDoProposal(t *testing.T) {
for i, tt := range tests {
ctx, _ := context.WithCancel(context.Background())
n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
n := raft.StartNode(0xBAD0, []uint64{0xBAD0}, 10, 1)
st := &storeRecorder{}
tk := make(chan time.Time)
// this makes <-tk always successful, which accelerates internal clock
@ -491,7 +491,7 @@ func TestDoProposal(t *testing.T) {
func TestDoProposalCancelled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
// node cannot make any progress because there are two nodes
n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1)
n := raft.StartNode(0xBAD0, []uint64{0xBAD0, 0xBAD1}, 10, 1)
st := &storeRecorder{}
wait := &waitRecorder{}
srv := &EtcdServer{
@ -527,7 +527,7 @@ func TestDoProposalStopped(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// node cannot make any progress because there are two nodes
n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1)
n := raft.StartNode(0xBAD0, []uint64{0xBAD0, 0xBAD1}, 10, 1)
st := &storeRecorder{}
tk := make(chan time.Time)
// this makes <-tk always successful, which accelarates internal clock
@ -668,7 +668,7 @@ func TestSyncTrigger(t *testing.T) {
// snapshot should snapshot the store and cut the persistent
// TODO: node.Compact is called... we need to make the node an interface
func TestSnapshot(t *testing.T) {
n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
n := raft.StartNode(0xBAD0, []uint64{0xBAD0}, 10, 1)
defer n.Stop()
st := &storeRecorder{}
p := &storageRecorder{}
@ -678,7 +678,7 @@ func TestSnapshot(t *testing.T) {
node: n,
}
s.snapshot(0, []int64{1})
s.snapshot(0, []uint64{1})
gaction := st.Action()
if len(gaction) != 1 {
t.Fatalf("len(action) = %d, want 1", len(gaction))
@ -699,7 +699,7 @@ func TestSnapshot(t *testing.T) {
// Applied > SnapCount should trigger a SaveSnap event
func TestTriggerSnap(t *testing.T) {
ctx := context.Background()
n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
n := raft.StartNode(0xBAD0, []uint64{0xBAD0}, 10, 1)
n.Campaign(ctx)
st := &storeRecorder{}
p := &storageRecorder{}
@ -712,7 +712,7 @@ func TestTriggerSnap(t *testing.T) {
}
s.start()
for i := 0; int64(i) < s.snapCount-1; i++ {
for i := 0; uint64(i) < s.snapCount-1; i++ {
s.Do(ctx, pb.Request{Method: "PUT", ID: 1})
}
time.Sleep(time.Millisecond)
@ -825,7 +825,7 @@ func TestRemoveMember(t *testing.T) {
ClusterStore: cs,
}
s.start()
id := int64(1)
id := uint64(1)
s.RemoveMember(context.TODO(), id)
gaction := n.Action()
s.Stop()
@ -962,9 +962,9 @@ func TestGenID(t *testing.T) {
// Sanity check that the GenID function has been seeded appropriately
// (math/rand is seeded with 1 by default)
r := rand.NewSource(int64(1))
var n int64
var n uint64
for n == 0 {
n = r.Int63()
n = uint64(r.Int63())
}
if n == GenID() {
t.Fatalf("GenID's rand seeded with 1!")
@ -1143,7 +1143,7 @@ func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return
func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
func (n *readyNode) ApplyConfChange(conf raftpb.ConfChange) {}
func (n *readyNode) Stop() {}
func (n *readyNode) Compact(index int64, nodes []int64, d []byte) {}
func (n *readyNode) Compact(index uint64, nodes []uint64, d []byte) {}
type nodeRecorder struct {
recorder
@ -1175,7 +1175,7 @@ func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) {
func (n *nodeRecorder) Stop() {
n.record(action{name: "Stop"})
}
func (n *nodeRecorder) Compact(index int64, nodes []int64, d []byte) {
func (n *nodeRecorder) Compact(index uint64, nodes []uint64, d []byte) {
n.record(action{name: "Compact"})
}
@ -1255,7 +1255,7 @@ func (cs *clusterStoreRecorder) Get() Cluster {
cs.record(action{name: "Get"})
return nil
}
func (cs *clusterStoreRecorder) Remove(id int64) {
func (cs *clusterStoreRecorder) Remove(id uint64) {
cs.record(action{name: "Remove", params: []interface{}{id}})
}

View File

@ -154,7 +154,7 @@ func startEtcd() {
Name: *name,
ClientURLs: acurls,
DataDir: *dir,
SnapCount: int64(*snapCount),
SnapCount: *snapCount,
Cluster: cluster,
DiscoveryURL: *durl,
ClusterState: *clusterState,

View File

@ -1,8 +1,8 @@
package types
// Int64Slice implements sort interface
type Int64Slice []int64
// Uint64Slice implements sort interface
type Uint64Slice []uint64
func (p Int64Slice) Len() int { return len(p) }
func (p Int64Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p Int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p Uint64Slice) Len() int { return len(p) }
func (p Uint64Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p Uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

View File

@ -896,16 +896,10 @@ func TestRecvMsgBeat(t *testing.T) {
func TestRestore(t *testing.T) {
s := pb.Snapshot{
<<<<<<< HEAD
Index: defaultCompactThreshold + 1,
Term: defaultCompactThreshold + 1,
Nodes: []int64{1, 2, 3},
RemovedNodes: []int64{4, 5},
=======
Index: defaultCompactThreshold + 1,
Term: defaultCompactThreshold + 1,
Nodes: []uint64{1, 2, 3},
>>>>>>> raft: int64 -> uint64
Nodes: []uint64{1, 2, 3},
RemovedNodes: []uint64{4, 5},
}
sm := newRaft(1, []uint64{1, 2}, 10, 1)

View File

@ -14,7 +14,7 @@ import (
var testSnap = &raftpb.Snapshot{
Data: []byte("some snapshot"),
Nodes: []int64{1, 2, 3},
Nodes: []uint64{1, 2, 3},
Index: 1,
Term: 1,
}

View File

@ -17,7 +17,7 @@ func Exist(dirpath string) bool {
// searchIndex returns the last array index of names whose raft index section is
// equal to or smaller than the given index.
// The given names MUST be sorted.
func searchIndex(names []string, index int64) (int, bool) {
func searchIndex(names []string, index uint64) (int, bool) {
for i := len(names) - 1; i >= 0; i-- {
name := names[i]
_, curIndex, err := parseWalName(name)
@ -34,7 +34,7 @@ func searchIndex(names []string, index int64) (int, bool) {
// names should have been sorted based on sequence number.
// isValidSeq checks whether seq increases continuously.
func isValidSeq(names []string) bool {
var lastSeq int64
var lastSeq uint64
for _, name := range names {
curSeq, _, err := parseWalName(name)
if err != nil {
@ -74,7 +74,7 @@ func checkWalNames(names []string) []string {
return wnames
}
func parseWalName(str string) (seq, index int64, err error) {
func parseWalName(str string) (seq, index uint64, err error) {
var num int
num, err = fmt.Sscanf(str, "%016x-%016x.wal", &seq, &index)
if num != 2 && err == nil {
@ -83,7 +83,7 @@ func parseWalName(str string) (seq, index int64, err error) {
return
}
func walName(seq, index int64) string {
func walName(seq, index uint64) string {
return fmt.Sprintf("%016x-%016x.wal", seq, index)
}

View File

@ -56,12 +56,12 @@ var (
type WAL struct {
dir string // the living directory of the underlay files
ri int64 // index of entry to start reading
ri uint64 // index of entry to start reading
decoder *decoder // decoder to decode records
f *os.File // underlay file opened for appending, sync
seq int64 // sequence of the wal file currently used for writes
enti int64 // index of the last entry saved to the wal
seq uint64 // sequence of the wal file currently used for writes
enti uint64 // index of the last entry saved to the wal
encoder *encoder // encoder to encode records
}
@ -98,7 +98,7 @@ func Create(dirpath string) (*WAL, error) {
// The returned WAL is ready to read and the first record will be the given
// index. The WAL cannot be appended to before reading out all of its
// previous records.
func OpenAtIndex(dirpath string, index int64) (*WAL, error) {
func OpenAtIndex(dirpath string, index uint64) (*WAL, error) {
names, err := readDir(dirpath)
if err != nil {
return nil, err
@ -154,7 +154,7 @@ func OpenAtIndex(dirpath string, index int64) (*WAL, error) {
// ReadAll reads out all records of the current WAL.
// If it cannot read out the expected entry, it will return ErrIndexNotFound.
// After ReadAll, the WAL will be ready for appending new records.
func (w *WAL) ReadAll() (id int64, state raftpb.HardState, ents []raftpb.Entry, err error) {
func (w *WAL) ReadAll() (id uint64, state raftpb.HardState, ents []raftpb.Entry, err error) {
rec := &walpb.Record{}
decoder := w.decoder

View File

@ -165,7 +165,7 @@ func TestRecover(t *testing.T) {
if err != nil {
t.Fatal(err)
}
i := &raftpb.Info{ID: int64(0xBAD0)}
i := &raftpb.Info{ID: uint64(0xBAD0)}
if err = w.SaveInfo(i); err != nil {
t.Fatal(err)
}
@ -207,7 +207,7 @@ func TestRecover(t *testing.T) {
func TestSearchIndex(t *testing.T) {
tests := []struct {
names []string
index int64
index uint64
widx int
wok bool
}{
@ -250,7 +250,7 @@ func TestSearchIndex(t *testing.T) {
func TestScanWalName(t *testing.T) {
tests := []struct {
str string
wseq, windex int64
wseq, windex uint64
wok bool
}{
{"0000000000000000-0000000000000000.wal", 0, 0, true},
@ -282,7 +282,7 @@ func TestRecoverAfterCut(t *testing.T) {
if err != nil {
t.Fatal(err)
}
info := &raftpb.Info{ID: int64(0xBAD1)}
info := &raftpb.Info{ID: uint64(0xBAD1)}
if err = w.SaveInfo(info); err != nil {
t.Fatal(err)
}
@ -294,7 +294,7 @@ func TestRecoverAfterCut(t *testing.T) {
t.Fatal(err)
}
for i := 1; i < 10; i++ {
e := raftpb.Entry{Index: int64(i)}
e := raftpb.Entry{Index: uint64(i)}
if err = w.SaveEntry(&e); err != nil {
t.Fatal(err)
}
@ -312,7 +312,7 @@ func TestRecoverAfterCut(t *testing.T) {
}
for i := 0; i < 10; i++ {
w, err := OpenAtIndex(p, int64(i))
w, err := OpenAtIndex(p, uint64(i))
if err != nil {
if i <= 4 {
if err != ErrFileNotFound {
@ -332,7 +332,7 @@ func TestRecoverAfterCut(t *testing.T) {
t.Errorf("#%d: id = %d, want %d", i, id, info.ID)
}
for j, e := range entries {
if e.Index != int64(j+i) {
if e.Index != uint64(j+i) {
t.Errorf("#%d: ents[%d].Index = %+v, want %+v", i, j, e.Index, j+i)
}
}