mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
*: fix many typos
This commit is contained in:
parent
4ba1ec6a4d
commit
20461ab11a
@ -98,7 +98,7 @@ type Config struct {
|
||||
// CheckRedirect specifies the policy for handling HTTP redirects.
|
||||
// If CheckRedirect is not nil, the Client calls it before
|
||||
// following an HTTP redirect. The sole argument is the number of
|
||||
// requests that have alrady been made. If CheckRedirect returns
|
||||
// requests that have already been made. If CheckRedirect returns
|
||||
// an error, Client.Do will not make any further requests and return
|
||||
// the error back it to the caller.
|
||||
//
|
||||
|
@ -27,7 +27,7 @@ var (
|
||||
|
||||
type srvDiscover struct{}
|
||||
|
||||
// NewSRVDiscover constructs a new Dicoverer that uses the stdlib to lookup SRV records.
|
||||
// NewSRVDiscover constructs a new Discoverer that uses the stdlib to lookup SRV records.
|
||||
func NewSRVDiscover() Discoverer {
|
||||
return &srvDiscover{}
|
||||
}
|
||||
|
@ -158,7 +158,7 @@ func (c *Client) ActiveConnection() *grpc.ClientConn {
|
||||
return c.conn
|
||||
}
|
||||
|
||||
// refreshConnection establishes a new connection
|
||||
// retryConnection establishes a new connection
|
||||
func (c *Client) retryConnection(oldConn *grpc.ClientConn, err error) (*grpc.ClientConn, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
@ -178,7 +178,7 @@ func (c *Client) retryConnection(oldConn *grpc.ClientConn, err error) (*grpc.Cli
|
||||
return c.conn, nil
|
||||
}
|
||||
|
||||
// dialEndpoints attempts to connect to each endpoint in order until a
|
||||
// dialEndpointList attempts to connect to each endpoint in order until a
|
||||
// connection is established.
|
||||
func dialEndpointList(c *Client) (*grpc.ClientConn, error) {
|
||||
var err error
|
||||
|
@ -40,7 +40,7 @@ type KV interface {
|
||||
Put(key, val string, leaseID lease.LeaseID) (*PutResponse, error)
|
||||
|
||||
// Range gets the keys [key, end) in the range at rev.
|
||||
// If revev <=0, range gets the keys at currentRev.
|
||||
// If rev <=0, range gets the keys at currentRev.
|
||||
// Limit limits the number of keys returned.
|
||||
// If the required rev is compacted, ErrCompacted will be returned.
|
||||
Range(key, end string, limit, rev int64, sort *SortOption) (*RangeResponse, error)
|
||||
|
@ -123,7 +123,7 @@ func TestProposeOnCommit(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestCloseBeforeReplay tests closing the producer before raft starts.
|
||||
// TestCloseProposerBeforeReplay tests closing the producer before raft starts.
|
||||
func TestCloseProposerBeforeReplay(t *testing.T) {
|
||||
clus := newCluster(1)
|
||||
// close before replay so raft never starts
|
||||
|
@ -30,7 +30,7 @@ func NewElection(client *clientv3.Client, keyPrefix string) *Election {
|
||||
return &Election{client, keyPrefix, nil}
|
||||
}
|
||||
|
||||
// Volunteer puts a value as elegible for the election. It blocks until
|
||||
// Volunteer puts a value as eligible for the election. It blocks until
|
||||
// it is elected or an error occurs (cannot withdraw candidacy)
|
||||
func (e *Election) Volunteer(val string) error {
|
||||
if e.leaderKey != nil {
|
||||
|
@ -214,12 +214,12 @@ func NewEphemeralKV(client *clientv3.Client, key, val string) (*EphemeralKV, err
|
||||
return &EphemeralKV{*k}, nil
|
||||
}
|
||||
|
||||
// NewEphemeralKey creates a new unique valueless key associated with a session lease
|
||||
// NewUniqueEphemeralKey creates a new unique valueless key associated with a session lease
|
||||
func NewUniqueEphemeralKey(client *clientv3.Client, prefix string) (*EphemeralKV, error) {
|
||||
return NewUniqueEphemeralKV(client, prefix, "")
|
||||
}
|
||||
|
||||
// NewEphemeralKV creates a new unique key/value pair associated with a session lease
|
||||
// NewUniqueEphemeralKV creates a new unique key/value pair associated with a session lease
|
||||
func NewUniqueEphemeralKV(client *clientv3.Client, prefix, val string) (ek *EphemeralKV, err error) {
|
||||
for {
|
||||
newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
|
||||
|
@ -23,7 +23,7 @@ import (
|
||||
"github.com/coreos/etcd/lease"
|
||||
)
|
||||
|
||||
// only keep one ephemeral lease per clientection
|
||||
// only keep one ephemeral lease per client
|
||||
var clientLeases clientLeaseMgr = clientLeaseMgr{leases: make(map[*clientv3.Client]*leaseKeepAlive)}
|
||||
|
||||
type clientLeaseMgr struct {
|
||||
@ -45,7 +45,7 @@ func SessionLeaseTTL(client *clientv3.Client, ttl int64) (lease.LeaseID, error)
|
||||
}
|
||||
|
||||
// StopSessionLease ends the refresh for the session lease. This is useful
|
||||
// in case the state of the client clientection is indeterminate (revoke
|
||||
// in case the state of the client connection is indeterminate (revoke
|
||||
// would fail) or if transferring lease ownership.
|
||||
func StopSessionLease(client *clientv3.Client) {
|
||||
clientLeases.mu.Lock()
|
||||
@ -95,7 +95,7 @@ func (clm *clientLeaseMgr) sessionLease(client *clientv3.Client, ttl int64) (lea
|
||||
lka := &leaseKeepAlive{id: id, donec: make(chan struct{})}
|
||||
clm.leases[client] = lka
|
||||
|
||||
// keep the lease alive until clientection error
|
||||
// keep the lease alive until client error
|
||||
go func() {
|
||||
defer func() {
|
||||
keepAlive.CloseSend()
|
||||
|
@ -39,7 +39,7 @@ func (q *PriorityQueue) Enqueue(val string, pr uint16) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Dequeue returns Enqueued()'d items in FIFO order. If the
|
||||
// Dequeue returns Enqueue()'d items in FIFO order. If the
|
||||
// queue is empty, Dequeue blocks until items are available.
|
||||
func (q *PriorityQueue) Dequeue() (string, error) {
|
||||
// TODO: fewer round trips by fetching more than one key
|
||||
|
@ -34,7 +34,7 @@ func (q *Queue) Enqueue(val string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Dequeue returns Enqueued()'d elements in FIFO order. If the
|
||||
// Dequeue returns Enqueue()'d elements in FIFO order. If the
|
||||
// queue is empty, Dequeue blocks until elements are available.
|
||||
func (q *Queue) Dequeue() (string, error) {
|
||||
// TODO: fewer round trips by fetching more than one key
|
||||
|
@ -55,7 +55,7 @@ func NewSTM(client *clientv3.Client, apply func(*STM) error) <-chan error {
|
||||
// Abort abandons the apply loop, letting the transaction close without a commit.
|
||||
func (s *STM) Abort() { s.aborted = true }
|
||||
|
||||
// Get returns the value for a given key, inserting the key into the txn's readset.
|
||||
// Get returns the value for a given key, inserting the key into the txn's rset.
|
||||
func (s *STM) Get(key string) (string, error) {
|
||||
if wv, ok := s.wset[key]; ok {
|
||||
return wv, nil
|
||||
|
@ -329,9 +329,9 @@ func TestCreateSelf(t *testing.T) {
|
||||
{c, nil},
|
||||
// client.create returns an error
|
||||
{errc, errc.err},
|
||||
// watcher.next retuens an error
|
||||
// watcher.next returns an error
|
||||
{errwc, errw.err},
|
||||
// parse key exist error to duplciate ID error
|
||||
// parse key exist error to duplicate ID error
|
||||
{errdupc, ErrDuplicateID},
|
||||
}
|
||||
|
||||
|
@ -406,8 +406,8 @@ func startProxy(cfg *config) error {
|
||||
clientURLs := []string{}
|
||||
uf := func() []string {
|
||||
gcls, err := etcdserver.GetClusterFromRemotePeers(peerURLs, tr)
|
||||
// TODO: remove the 2nd check when we fix GetClusterFromPeers
|
||||
// GetClusterFromPeers should not return nil error with an invalid empty cluster
|
||||
// TODO: remove the 2nd check when we fix GetClusterFromRemotePeers
|
||||
// GetClusterFromRemotePeers should not return nil error with an invalid empty cluster
|
||||
if err != nil {
|
||||
plog.Warningf("proxy: %v", err)
|
||||
return []string{}
|
||||
|
@ -673,7 +673,7 @@ func TestIsReadyToRemoveMember(t *testing.T) {
|
||||
},
|
||||
{
|
||||
// 1/2 members ready, should be fine to remove unstarted member
|
||||
// (iReadyToRemoveMember() logic should return success, but operation itself would fail)
|
||||
// (isReadyToRemoveMember() logic should return success, but operation itself would fail)
|
||||
[]*Member{
|
||||
newTestMember(1, nil, "1", nil),
|
||||
newTestMember(2, nil, "", nil),
|
||||
|
@ -56,7 +56,7 @@ type ServerConfig struct {
|
||||
EnablePprof bool
|
||||
}
|
||||
|
||||
// VerifyBootstrapConfig sanity-checks the initial config for bootstrap case
|
||||
// VerifyBootstrap sanity-checks the initial config for bootstrap case
|
||||
// and returns an error for things that should never happen.
|
||||
func (c *ServerConfig) VerifyBootstrap() error {
|
||||
if err := c.verifyLocalMember(true); err != nil {
|
||||
|
@ -32,7 +32,7 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
// capabilityMap is a static map of version to capability map.
|
||||
// capabilityMaps is a static map of version to capability map.
|
||||
// the base capabilities is the set of capability 2.0 supports.
|
||||
capabilityMaps = map[string]map[capability]bool{
|
||||
"2.1.0": {authCapability: true},
|
||||
@ -41,12 +41,12 @@ var (
|
||||
}
|
||||
|
||||
enableMapMu sync.Mutex
|
||||
// enabled points to a map in cpapbilityMaps
|
||||
// enabledMap points to a map in capabilityMaps
|
||||
enabledMap map[capability]bool
|
||||
)
|
||||
|
||||
// capabilityLoop checks the cluster version every 500ms and updates
|
||||
// the enabledCapability when the cluster version increased.
|
||||
// the enabledMap when the cluster version increased.
|
||||
// capabilityLoop MUST be ran in a goroutine before checking capability
|
||||
// or using capabilityHandler.
|
||||
func capabilityLoop(s *etcdserver.EtcdServer) {
|
||||
|
@ -361,7 +361,7 @@ func serveVars(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
// TODO: change etcdserver to raft interface when we have it.
|
||||
// add test for healthHeadler when we have the interface ready.
|
||||
// add test for healthHandler when we have the interface ready.
|
||||
func healthHandler(server *etcdserver.EtcdServer) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if !allowMethod(w, r.Method, "GET") {
|
||||
|
@ -42,7 +42,7 @@ var (
|
||||
|
||||
// writeError logs and writes the given Error to the ResponseWriter
|
||||
// If Error is an etcdErr, it is rendered to the ResponseWriter
|
||||
// Otherwise, it is assumed to be an InternalServerError
|
||||
// Otherwise, it is assumed to be a StatusInternalServerError
|
||||
func writeError(w http.ResponseWriter, r *http.Request, err error) {
|
||||
if err == nil {
|
||||
return
|
||||
|
@ -27,7 +27,7 @@ import (
|
||||
"github.com/coreos/etcd/rafthttp"
|
||||
)
|
||||
|
||||
// TestNewPeerHandler tests that NewPeerHandler returns a handler that
|
||||
// TestNewPeerHandlerOnRaftPrefix tests that NewPeerHandler returns a handler that
|
||||
// handles raft-prefix requests well.
|
||||
func TestNewPeerHandlerOnRaftPrefix(t *testing.T) {
|
||||
h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -378,7 +378,7 @@ func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (type
|
||||
// the entries. The given snapshot/entries can contain two kinds of
|
||||
// ID-related entry:
|
||||
// - ConfChangeAddNode, in which case the contained ID will be added into the set.
|
||||
// - ConfChangeAddRemove, in which case the contained ID will be removed from the set.
|
||||
// - ConfChangeRemoveNode, in which case the contained ID will be removed from the set.
|
||||
func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
|
||||
ids := make(map[uint64]bool)
|
||||
if snap != nil {
|
||||
|
@ -64,7 +64,7 @@ const (
|
||||
|
||||
purgeFileInterval = 30 * time.Second
|
||||
// monitorVersionInterval should be smaller than the timeout
|
||||
// on the connection. Or we will not be able to resue the connection
|
||||
// on the connection. Or we will not be able to reuse the connection
|
||||
// (since it will timeout).
|
||||
monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second
|
||||
|
||||
@ -486,7 +486,7 @@ type etcdProgress struct {
|
||||
appliedi uint64
|
||||
}
|
||||
|
||||
// newApplier buffers apply operations so raftNode won't block on sending
|
||||
// startApplier buffers apply operations so raftNode won't block on sending
|
||||
// new applies, timing out (since applies can be slow). The goroutine begins
|
||||
// shutdown on close(s.done) and closes the returned channel when finished.
|
||||
func (s *EtcdServer) startApplier(ep etcdProgress) <-chan struct{} {
|
||||
@ -616,7 +616,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
||||
plog.Panicf("rename snapshot file error: %v", err)
|
||||
}
|
||||
|
||||
// TODO: recover leassor
|
||||
// TODO: recover lessor
|
||||
|
||||
newbe := backend.NewDefaultBackend(fn)
|
||||
if err := s.kv.Restore(newbe); err != nil {
|
||||
@ -1216,7 +1216,7 @@ func (s *EtcdServer) ClusterVersion() *semver.Version {
|
||||
return s.cluster.Version()
|
||||
}
|
||||
|
||||
// monitorVersions checks the member's version every monitorVersion interval.
|
||||
// monitorVersions checks the member's version every monitorVersionInterval.
|
||||
// It updates the cluster version if all members agrees on a higher one.
|
||||
// It prints out log if there is a member with a higher version than the
|
||||
// local version.
|
||||
|
@ -110,7 +110,7 @@ func readWAL(waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID,
|
||||
return
|
||||
}
|
||||
|
||||
// upgradeWAL converts an older version of the etcdServer data to the newest version.
|
||||
// upgradeDataDir converts an older version of the etcdServer data to the newest version.
|
||||
// It must ensure that, after upgrading, the most recent version is present.
|
||||
func upgradeDataDir(baseDataDir string, name string, ver version.DataDirVersion) error {
|
||||
switch ver {
|
||||
|
@ -431,7 +431,7 @@ func mustNewMember(t *testing.T, name string, peerTLS *transport.TLSInfo, client
|
||||
return m
|
||||
}
|
||||
|
||||
// startGRPC starts a grpc server over a unix domain socket on the member
|
||||
// listenGRPC starts a grpc server over a unix domain socket on the member
|
||||
func (m *member) listenGRPC() error {
|
||||
if m.V3demo == false {
|
||||
return fmt.Errorf("starting grpc server without v3 configured")
|
||||
|
@ -275,7 +275,7 @@ func TestIssue2904(t *testing.T) {
|
||||
ma.Remove(ctx, c.Members[1].s.ID().String())
|
||||
cancel()
|
||||
|
||||
// restart member, and expect it to send updateAttr request.
|
||||
// restart member, and expect it to send UpdateAttributes request.
|
||||
// the log in the leader is like this:
|
||||
// [..., remove 1, ..., update attr 1, ...]
|
||||
c.Members[1].Restart(t)
|
||||
|
@ -21,7 +21,7 @@ import (
|
||||
"github.com/coreos/etcd/contrib/recipes"
|
||||
)
|
||||
|
||||
// TestElectionWait tests if followers can correcty wait for elections.
|
||||
// TestElectionWait tests if followers can correctly wait for elections.
|
||||
func TestElectionWait(t *testing.T) {
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||
defer clus.Terminate(t)
|
||||
|
@ -35,7 +35,7 @@ func TestMutexMultiNode(t *testing.T) {
|
||||
}
|
||||
|
||||
func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client) {
|
||||
// stream lock acquistions
|
||||
// stream lock acquisitions
|
||||
lockedC := make(chan *recipe.Mutex, 1)
|
||||
for i := 0; i < waiters; i++ {
|
||||
go func() {
|
||||
|
@ -82,7 +82,7 @@ func BenchmarkQueue(b *testing.B) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestPrQueue tests whether priority queues respect priorities.
|
||||
// TestPrQueueOneReaderOneWriter tests whether priority queues respect priorities.
|
||||
func TestPrQueueOneReaderOneWriter(t *testing.T) {
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
@ -87,7 +87,7 @@ func TestSTMConflict(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestSTMPut confirms a STM put on a new key is visible after commit.
|
||||
// TestSTMPutNewKey confirms a STM put on a new key is visible after commit.
|
||||
func TestSTMPutNewKey(t *testing.T) {
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
@ -56,7 +56,7 @@ type RangeDeleter interface {
|
||||
|
||||
// A Lessor is the owner of leases. It can grant, revoke, renew and modify leases for lessee.
|
||||
type Lessor interface {
|
||||
// SetDeleteableRange sets the RangeDeleter to the Lessor.
|
||||
// SetRangeDeleter sets the RangeDeleter to the Lessor.
|
||||
// Lessor deletes the items in the revoked or expired lease from the
|
||||
// the set RangeDeleter.
|
||||
SetRangeDeleter(dr RangeDeleter)
|
||||
@ -117,9 +117,9 @@ type lessor struct {
|
||||
// TODO: probably this should be a heap with a secondary
|
||||
// id index.
|
||||
// Now it is O(N) to loop over the leases to find expired ones.
|
||||
// We want to make Grant, Revoke, and FindExpired all O(logN) and
|
||||
// We want to make Grant, Revoke, and findExpiredLeases all O(logN) and
|
||||
// Renew O(1).
|
||||
// FindExpired and Renew should be the most frequent operations.
|
||||
// findExpiredLeases and Renew should be the most frequent operations.
|
||||
leaseMap map[LeaseID]*Lease
|
||||
|
||||
// When a lease expires, the lessor will delete the
|
||||
|
@ -27,7 +27,7 @@ import (
|
||||
|
||||
// TestLessorGrant ensures Lessor can grant wanted lease.
|
||||
// The granted lease should have a unique ID with a term
|
||||
// that is greater than minLeaseTerm.
|
||||
// that is greater than minLeaseTTL.
|
||||
func TestLessorGrant(t *testing.T) {
|
||||
dir, be := NewTestBackend(t)
|
||||
defer os.RemoveAll(dir)
|
||||
@ -72,7 +72,7 @@ func TestLessorGrant(t *testing.T) {
|
||||
|
||||
// TestLessorRevoke ensures Lessor can revoke a lease.
|
||||
// The items in the revoked lease should be removed from
|
||||
// the DeleteableKV.
|
||||
// the backend.
|
||||
// The revoked lease cannot be got from Lessor again.
|
||||
func TestLessorRevoke(t *testing.T) {
|
||||
dir, be := NewTestBackend(t)
|
||||
|
@ -59,7 +59,7 @@ func ReadDir(dirpath string) ([]string, error) {
|
||||
return names, nil
|
||||
}
|
||||
|
||||
// TouchDirAll is simliar to os.MkdirAll. It creates directories with 0700 permission if any directory
|
||||
// TouchDirAll is similar to os.MkdirAll. It creates directories with 0700 permission if any directory
|
||||
// does not exists. TouchDirAll also ensures the given directory is writable.
|
||||
func TouchDirAll(dir string) error {
|
||||
err := os.MkdirAll(dir, privateDirMode)
|
||||
|
@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// Package timeutil providese time-related utility functions.
|
||||
// Package timeutil provides time-related utility functions.
|
||||
package timeutil
|
||||
|
||||
import "time"
|
||||
|
@ -31,7 +31,7 @@ func NewTimeoutTransport(info TLSInfo, dialtimeoutd, rdtimeoutd, wtimeoutd time.
|
||||
}
|
||||
|
||||
if rdtimeoutd != 0 || wtimeoutd != 0 {
|
||||
// the timeouted connection will timeout soon after it is idle.
|
||||
// the timed out connection will timeout soon after it is idle.
|
||||
// it should not be put back to http transport as an idle connection for future usage.
|
||||
tr.MaxIdleConnsPerHost = -1
|
||||
}
|
||||
|
@ -40,7 +40,7 @@ func NewURLsMap(s string) (URLsMap, error) {
|
||||
return cl, nil
|
||||
}
|
||||
|
||||
// String returns NameURLPairs into discovery-formatted name-to-URLs sorted by name.
|
||||
// String turns URLsMap into discovery-formatted name-to-URLs sorted by name.
|
||||
func (c URLsMap) String() string {
|
||||
var pairs []string
|
||||
for name, urls := range c {
|
||||
|
@ -33,7 +33,7 @@ func ExampleNode() {
|
||||
// the last known state
|
||||
var prev pb.HardState
|
||||
for {
|
||||
// ReadState blocks until there is new state ready.
|
||||
// Ready blocks until there is new state ready.
|
||||
rd := <-n.Ready()
|
||||
if !isHardStateEqual(prev, rd.HardState) {
|
||||
saveStateToDisk(rd.HardState)
|
||||
|
@ -273,7 +273,7 @@ func TestLogMaybeAppend(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestCompactionSideEffects ensures that all the log related funcationality works correctly after
|
||||
// TestCompactionSideEffects ensures that all the log related functionality works correctly after
|
||||
// a compaction.
|
||||
func TestCompactionSideEffects(t *testing.T) {
|
||||
var i uint64
|
||||
|
@ -51,7 +51,7 @@ func (u *unstable) maybeLastIndex() (uint64, bool) {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
// myabeTerm returns the term of the entry at index i, if there
|
||||
// maybeTerm returns the term of the entry at index i, if there
|
||||
// is any.
|
||||
func (u *unstable) maybeTerm(i uint64) (uint64, bool) {
|
||||
if i < u.offset {
|
||||
@ -79,7 +79,7 @@ func (u *unstable) stableTo(i, t uint64) {
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
// if i < offest, term is matched with the snapshot
|
||||
// if i < offset, term is matched with the snapshot
|
||||
// only update the unstable entries if term is matched with
|
||||
// an unstable entry.
|
||||
if gt == t && i >= u.offset {
|
||||
|
@ -145,7 +145,7 @@ type Peer struct {
|
||||
func StartNode(c *Config, peers []Peer) Node {
|
||||
r := newRaft(c)
|
||||
// become the follower at term 1 and apply initial configuration
|
||||
// entires of term 1
|
||||
// entries of term 1
|
||||
r.becomeFollower(1, None)
|
||||
for _, peer := range peers {
|
||||
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
|
||||
|
@ -77,8 +77,8 @@ type Config struct {
|
||||
// message to maintain the leadership every heartbeat interval.
|
||||
HeartbeatTick int
|
||||
|
||||
// Storage is the storage for raft. raft generates entires and
|
||||
// states to be stored in storage. raft reads the persisted entires
|
||||
// Storage is the storage for raft. raft generates entries and
|
||||
// states to be stored in storage. raft reads the persisted entries
|
||||
// and states out of Storage when it needs. raft reads out the previous
|
||||
// state and configuration out of storage when restarting.
|
||||
Storage Storage
|
||||
|
@ -58,7 +58,7 @@ func TestMsgAppFlowControlFull(t *testing.T) {
|
||||
// TestMsgAppFlowControlMoveForward ensures msgAppResp can move
|
||||
// forward the sending window correctly:
|
||||
// 1. valid msgAppResp.index moves the windows to pass all smaller or equal index.
|
||||
// 2. out-of-dated msgAppResp has no effect on the silding window.
|
||||
// 2. out-of-dated msgAppResp has no effect on the sliding window.
|
||||
func TestMsgAppFlowControlMoveForward(t *testing.T) {
|
||||
r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
|
||||
r.becomeCandidate()
|
||||
|
@ -36,7 +36,7 @@ func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) {
|
||||
return ents
|
||||
}
|
||||
|
||||
type Interface interface {
|
||||
type stateMachine interface {
|
||||
Step(m pb.Message) error
|
||||
readMessages() []pb.Message
|
||||
}
|
||||
@ -939,7 +939,7 @@ func TestHandleHeartbeatResp(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestMsgAppRespWaitReset verifies the waitReset behavior of a leader
|
||||
// TestMsgAppRespWaitReset verifies the resume behavior of a leader
|
||||
// MsgAppResp.
|
||||
func TestMsgAppRespWaitReset(t *testing.T) {
|
||||
sm := newTestRaft(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage())
|
||||
@ -1920,7 +1920,7 @@ func ents(terms ...uint64) *raft {
|
||||
}
|
||||
|
||||
type network struct {
|
||||
peers map[uint64]Interface
|
||||
peers map[uint64]stateMachine
|
||||
storage map[uint64]*MemoryStorage
|
||||
dropm map[connem]float64
|
||||
ignorem map[pb.MessageType]bool
|
||||
@ -1930,11 +1930,11 @@ type network struct {
|
||||
// A nil node will be replaced with a new *stateMachine.
|
||||
// A *stateMachine will get its k, id.
|
||||
// When using stateMachine, the address list is always [1, n].
|
||||
func newNetwork(peers ...Interface) *network {
|
||||
func newNetwork(peers ...stateMachine) *network {
|
||||
size := len(peers)
|
||||
peerAddrs := idsBySize(size)
|
||||
|
||||
npeers := make(map[uint64]Interface, size)
|
||||
npeers := make(map[uint64]stateMachine, size)
|
||||
nstorage := make(map[uint64]*MemoryStorage, size)
|
||||
|
||||
for j, p := range peers {
|
||||
|
@ -65,7 +65,7 @@ func TestNetworkDelay(t *testing.T) {
|
||||
}
|
||||
|
||||
w := time.Duration(float64(sent)*delayrate/2) * delay
|
||||
// there are pretty overhead in the send call, since it genarete random numbers.
|
||||
// there is some overhead in the send call since it generates random numbers.
|
||||
if total < w {
|
||||
t.Errorf("total = %v, want > %v", total, w)
|
||||
}
|
||||
|
@ -6,7 +6,7 @@
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writinrawNode, software
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
@ -45,7 +45,7 @@ func TestRawNodeStep(t *testing.T) {
|
||||
// no goroutine in RawNode.
|
||||
|
||||
// TestRawNodeProposeAndConfChange ensures that RawNode.Propose and RawNode.ProposeConfChange
|
||||
// send the given proposal and ConfChangeto the underlying raft.
|
||||
// send the given proposal and ConfChange to the underlying raft.
|
||||
func TestRawNodeProposeAndConfChange(t *testing.T) {
|
||||
s := NewMemoryStorage()
|
||||
var err error
|
||||
|
@ -57,7 +57,7 @@ type Storage interface {
|
||||
// first log entry is not available).
|
||||
FirstIndex() (uint64, error)
|
||||
// Snapshot returns the most recent snapshot.
|
||||
// If snapshot is temporarily unavailable, it should return ErrTemporarilyUnavailable,
|
||||
// If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable,
|
||||
// so raft state machine could know that Storage needs some time to prepare
|
||||
// snapshot and call Snapshot later.
|
||||
Snapshot() (pb.Snapshot, error)
|
||||
@ -162,7 +162,7 @@ func (ms *MemoryStorage) ApplySnapshot(snap pb.Snapshot) error {
|
||||
ms.Lock()
|
||||
defer ms.Unlock()
|
||||
|
||||
// TODO: return snapOutOfDate?
|
||||
// TODO: return ErrSnapOutOfDate?
|
||||
ms.snapshot = snap
|
||||
ms.ents = []pb.Entry{{Term: snap.Metadata.Term, Index: snap.Metadata.Index}}
|
||||
return nil
|
||||
|
@ -220,7 +220,7 @@ func TestStorageAppend(t *testing.T) {
|
||||
nil,
|
||||
[]pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 5}},
|
||||
},
|
||||
// tunncate the existing entries and append
|
||||
// truncate the existing entries and append
|
||||
{
|
||||
[]pb.Entry{{Index: 4, Term: 5}},
|
||||
nil,
|
||||
|
@ -113,7 +113,7 @@ func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
return
|
||||
}
|
||||
// Write StatusNoContet header after the message has been processed by
|
||||
// Write StatusNoContent header after the message has been processed by
|
||||
// raft, which facilitates the client to report MsgSnap status.
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
@ -192,7 +192,7 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
return
|
||||
}
|
||||
// Write StatusNoContet header after the message has been processed by
|
||||
// Write StatusNoContent header after the message has been processed by
|
||||
// raft, which facilitates the client to report MsgSnap status.
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
@ -27,10 +27,10 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// ConnRead/WriteTimeout is the i/o timeout set on each connection rafthttp pkg creates.
|
||||
// ConnReadTimeout and ConnWriteTimeout are the i/o timeout set on each connection rafthttp pkg creates.
|
||||
// A 5 seconds timeout is good enough for recycling bad connections. Or we have to wait for
|
||||
// tcp keepalive failing to detect a bad connection, which is at minutes level.
|
||||
// For long term streaming connections, rafthttp pkg sends application level linkHeartbeat
|
||||
// For long term streaming connections, rafthttp pkg sends application level linkHeartbeatMessage
|
||||
// to keep the connection alive.
|
||||
// For short term pipeline connections, the connection MUST be killed to avoid it being
|
||||
// put back to http pkg connection pool.
|
||||
@ -59,7 +59,7 @@ type Peer interface {
|
||||
// raft.
|
||||
send(m raftpb.Message)
|
||||
|
||||
// sendSanp sends the merged snapshot message to the remote peer. Its behavior
|
||||
// sendSnap sends the merged snapshot message to the remote peer. Its behavior
|
||||
// is similar to send.
|
||||
sendSnap(m snap.Message)
|
||||
|
||||
|
@ -37,7 +37,7 @@ import (
|
||||
// continuously, and closes it when stopped.
|
||||
func TestStreamWriterAttachOutgoingConn(t *testing.T) {
|
||||
sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
|
||||
// the expected initial state of streamWrite is not working
|
||||
// the expected initial state of streamWriter is not working
|
||||
if _, ok := sw.writec(); ok != false {
|
||||
t.Errorf("initial working status = %v, want false", ok)
|
||||
}
|
||||
|
@ -23,7 +23,7 @@ import (
|
||||
|
||||
// TestURLPickerPickTwice tests that pick returns a possible url,
|
||||
// and always returns the same one.
|
||||
func TestURLPickerPick(t *testing.T) {
|
||||
func TestURLPickerPickTwice(t *testing.T) {
|
||||
picker := mustNewURLPicker(t, []string{"http://127.0.0.1:2380", "http://127.0.0.1:7001"})
|
||||
|
||||
u := picker.pick()
|
||||
|
@ -126,7 +126,7 @@ func checkPostResponse(resp *http.Response, body []byte, req *http.Request, to t
|
||||
}
|
||||
}
|
||||
|
||||
// reportErr reports the given error through sending it into
|
||||
// reportCriticalError reports the given error through sending it into
|
||||
// the given error channel.
|
||||
// If the error channel is filled up when sending error, it drops the error
|
||||
// because the fact that error has happened is reported, which is
|
||||
|
@ -207,7 +207,7 @@ func TestEmptySnapshot(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestAllSnapshotBroken ensures snapshotter returens
|
||||
// TestAllSnapshotBroken ensures snapshotter returns
|
||||
// ErrNoSnapshot if all the snapshots are broken.
|
||||
func TestAllSnapshotBroken(t *testing.T) {
|
||||
dir := path.Join(os.TempDir(), "snapshot")
|
||||
|
@ -46,7 +46,7 @@ type Backend interface {
|
||||
type Snapshot interface {
|
||||
// Size gets the size of the snapshot.
|
||||
Size() int64
|
||||
// WriteTo writes the snapshot into the given writter.
|
||||
// WriteTo writes the snapshot into the given writer.
|
||||
WriteTo(w io.Writer) (n int64, err error)
|
||||
// Close closes the snapshot.
|
||||
Close() error
|
||||
|
@ -196,7 +196,7 @@ func (ki *keyIndex) compact(atRev int64, available map[revision]struct{}) {
|
||||
}
|
||||
|
||||
// walk until reaching the first revision that has an revision smaller or equal to
|
||||
// the atRevision.
|
||||
// the atRev.
|
||||
// add it to the available map
|
||||
f := func(rev revision) bool {
|
||||
if rev.main <= atRev {
|
||||
@ -237,7 +237,7 @@ func (ki *keyIndex) isEmpty() bool {
|
||||
return len(ki.generations) == 1 && ki.generations[0].isEmpty()
|
||||
}
|
||||
|
||||
// findGeneartion finds out the generation of the keyIndex that the
|
||||
// findGeneration finds out the generation of the keyIndex that the
|
||||
// given rev belongs to. If the given rev is at the gap of two generations,
|
||||
// which means that the key does not exist at the given rev, it returns nil.
|
||||
func (ki *keyIndex) findGeneration(rev int64) *generation {
|
||||
|
@ -222,7 +222,7 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, id Watch
|
||||
return wa, cancel
|
||||
}
|
||||
|
||||
// syncWatchersLoop syncs the watcher in the unsyncd map every 100ms.
|
||||
// syncWatchersLoop syncs the watcher in the unsynced map every 100ms.
|
||||
func (s *watchableStore) syncWatchersLoop() {
|
||||
defer s.wg.Done()
|
||||
|
||||
|
@ -47,7 +47,7 @@ type WatchStream interface {
|
||||
// returned.
|
||||
Cancel(id WatchID) error
|
||||
|
||||
// Close closes the WatchChan and release all related resources.
|
||||
// Close closes Chan and release all related resources.
|
||||
Close()
|
||||
|
||||
// Rev returns the current revision of the KV the stream watches on.
|
||||
|
@ -149,7 +149,7 @@ func TestWatcherWatchPrefix(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestWatchStreamCancel ensures cancel calls the cancel func of the watcher
|
||||
// TestWatchStreamCancelWatcherByID ensures cancel calls the cancel func of the watcher
|
||||
// with given id inside watchStream.
|
||||
func TestWatchStreamCancelWatcherByID(t *testing.T) {
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
|
@ -429,7 +429,7 @@ func (s *store) Watch(key string, recursive, stream bool, sinceIndex uint64) (Wa
|
||||
if sinceIndex == 0 {
|
||||
sinceIndex = s.CurrentIndex + 1
|
||||
}
|
||||
// WatchHub does not know about the current index, so we need to pass it in
|
||||
// WatcherHub does not know about the current index, so we need to pass it in
|
||||
w, err := s.WatcherHub.watch(key, recursive, stream, sinceIndex, s.CurrentIndex)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -631,7 +631,7 @@ func (s *store) internalGet(nodePath string) (*node, *etcdErr.Error) {
|
||||
return f, nil
|
||||
}
|
||||
|
||||
// deleteExpiredKyes will delete all
|
||||
// DeleteExpiredKeys will delete all expired keys
|
||||
func (s *store) DeleteExpiredKeys(cutoff time.Time) {
|
||||
s.worldLock.Lock()
|
||||
defer s.worldLock.Unlock()
|
||||
|
@ -65,7 +65,7 @@ func TestMinExpireTime(t *testing.T) {
|
||||
assert.Equal(t, e.Node.TTL, 0)
|
||||
}
|
||||
|
||||
// Ensure that the store can recrusively retrieve a directory listing.
|
||||
// Ensure that the store can recursively retrieve a directory listing.
|
||||
// Note that hidden files should not be returned.
|
||||
func TestStoreGetDirectory(t *testing.T) {
|
||||
s := newStore()
|
||||
@ -472,7 +472,7 @@ func TestStoreCompareAndDeletePrevValue(t *testing.T) {
|
||||
assert.Equal(t, e.Action, "compareAndDelete", "")
|
||||
assert.Equal(t, e.Node.Key, "/foo", "")
|
||||
|
||||
// check pervNode
|
||||
// check prevNode
|
||||
assert.NotNil(t, e.PrevNode, "")
|
||||
assert.Equal(t, e.PrevNode.Key, "/foo", "")
|
||||
assert.Equal(t, *e.PrevNode.Value, "bar", "")
|
||||
@ -502,7 +502,7 @@ func TestStoreCompareAndDeletePrevIndex(t *testing.T) {
|
||||
assert.Nil(t, err, "")
|
||||
assert.Equal(t, e.EtcdIndex, eidx, "")
|
||||
assert.Equal(t, e.Action, "compareAndDelete", "")
|
||||
// check pervNode
|
||||
// check prevNode
|
||||
assert.NotNil(t, e.PrevNode, "")
|
||||
assert.Equal(t, e.PrevNode.Key, "/foo", "")
|
||||
assert.Equal(t, *e.PrevNode.Value, "bar", "")
|
||||
@ -545,7 +545,7 @@ func TestStoreCompareAndSwapPrevValue(t *testing.T) {
|
||||
assert.Equal(t, e.EtcdIndex, eidx, "")
|
||||
assert.Equal(t, e.Action, "compareAndSwap", "")
|
||||
assert.Equal(t, *e.Node.Value, "baz", "")
|
||||
// check pervNode
|
||||
// check prevNode
|
||||
assert.NotNil(t, e.PrevNode, "")
|
||||
assert.Equal(t, e.PrevNode.Key, "/foo", "")
|
||||
assert.Equal(t, *e.PrevNode.Value, "bar", "")
|
||||
|
@ -41,7 +41,7 @@ type watcherHub struct {
|
||||
EventHistory *EventHistory
|
||||
}
|
||||
|
||||
// newWatchHub creates a watchHub. The capacity determines how many events we will
|
||||
// newWatchHub creates a watcherHub. The capacity determines how many events we will
|
||||
// keep in the eventHistory.
|
||||
// Typically, we only need to keep a small size of history[smaller than 20K].
|
||||
// Ideally, it should smaller than 20K/s[max throughput] * 2 * 50ms[RTT] = 2000
|
||||
|
@ -35,7 +35,7 @@ var (
|
||||
GitSHA = "Not provided (use ./build instead of go build)"
|
||||
)
|
||||
|
||||
// WalVersion is an enum for versions of etcd logs.
|
||||
// DataDirVersion is an enum for versions of etcd logs.
|
||||
type DataDirVersion string
|
||||
|
||||
const (
|
||||
|
@ -23,7 +23,7 @@ import (
|
||||
"github.com/coreos/etcd/wal/walpb"
|
||||
)
|
||||
|
||||
// Repair tries to repair the unexpectedEOF error in the
|
||||
// Repair tries to repair ErrUnexpectedEOF in the
|
||||
// last wal file by truncating.
|
||||
func Repair(dirpath string) bool {
|
||||
f, err := openLast(dirpath)
|
||||
|
Loading…
x
Reference in New Issue
Block a user