mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #14419 from ahrtr/alarm_list_ci
Move consistent_index forward when executing alarmList operation
This commit is contained in:
commit
3dc5348d94
@ -1840,7 +1840,6 @@ func (s *EtcdServer) apply(
|
|||||||
// applyEntryNormal applies an EntryNormal type raftpb request to the EtcdServer
|
// applyEntryNormal applies an EntryNormal type raftpb request to the EtcdServer
|
||||||
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
||||||
shouldApplyV3 := membership.ApplyV2storeOnly
|
shouldApplyV3 := membership.ApplyV2storeOnly
|
||||||
applyV3Performed := false
|
|
||||||
var ar *apply.Result
|
var ar *apply.Result
|
||||||
index := s.consistIndex.ConsistentIndex()
|
index := s.consistIndex.ConsistentIndex()
|
||||||
if e.Index > index {
|
if e.Index > index {
|
||||||
@ -1850,7 +1849,8 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
|||||||
defer func() {
|
defer func() {
|
||||||
// The txPostLockInsideApplyHook will not get called in some cases,
|
// The txPostLockInsideApplyHook will not get called in some cases,
|
||||||
// in which we should move the consistent index forward directly.
|
// in which we should move the consistent index forward directly.
|
||||||
if !applyV3Performed || (ar != nil && ar.Err != nil) {
|
newIndex := s.consistIndex.ConsistentIndex()
|
||||||
|
if newIndex < e.Index {
|
||||||
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
|
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -1903,7 +1903,6 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
|||||||
if !needResult && raftReq.Txn != nil {
|
if !needResult && raftReq.Txn != nil {
|
||||||
removeNeedlessRangeReqs(raftReq.Txn)
|
removeNeedlessRangeReqs(raftReq.Txn)
|
||||||
}
|
}
|
||||||
applyV3Performed = true
|
|
||||||
ar = s.uberApply.Apply(&raftReq, shouldApplyV3)
|
ar = s.uberApply.Apply(&raftReq, shouldApplyV3)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,3 +104,28 @@ func TestAlarm(t *testing.T) {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAlarmlistOnMemberRestart(t *testing.T) {
|
||||||
|
testRunner.BeforeTest(t)
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
clus := testRunner.NewCluster(ctx, t, config.ClusterConfig{
|
||||||
|
ClusterSize: 1,
|
||||||
|
QuotaBackendBytes: int64(13 * os.Getpagesize()),
|
||||||
|
SnapshotCount: 5,
|
||||||
|
})
|
||||||
|
defer clus.Close()
|
||||||
|
|
||||||
|
testutils.ExecuteUntil(ctx, t, func() {
|
||||||
|
for i := 0; i < 6; i++ {
|
||||||
|
if _, err := clus.Client().AlarmList(ctx); err != nil {
|
||||||
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
clus.Members()[0].Stop()
|
||||||
|
if err := clus.Members()[0].Start(ctx); err != nil {
|
||||||
|
t.Fatalf("failed to start etcdserver: %v", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
@ -32,4 +32,5 @@ type ClusterConfig struct {
|
|||||||
ClientTLS TLSConfig
|
ClientTLS TLSConfig
|
||||||
QuotaBackendBytes int64
|
QuotaBackendBytes int64
|
||||||
DisableStrictReconfigCheck bool
|
DisableStrictReconfigCheck bool
|
||||||
|
SnapshotCount int
|
||||||
}
|
}
|
||||||
|
@ -47,6 +47,7 @@ func (e e2eRunner) NewCluster(ctx context.Context, t testing.TB, cfg config.Clus
|
|||||||
ClusterSize: cfg.ClusterSize,
|
ClusterSize: cfg.ClusterSize,
|
||||||
QuotaBackendBytes: cfg.QuotaBackendBytes,
|
QuotaBackendBytes: cfg.QuotaBackendBytes,
|
||||||
DisableStrictReconfigCheck: cfg.DisableStrictReconfigCheck,
|
DisableStrictReconfigCheck: cfg.DisableStrictReconfigCheck,
|
||||||
|
SnapshotCount: cfg.SnapshotCount,
|
||||||
}
|
}
|
||||||
switch cfg.ClientTLS {
|
switch cfg.ClientTLS {
|
||||||
case config.NoTLS:
|
case config.NoTLS:
|
||||||
@ -175,7 +176,7 @@ func (m e2eMember) Client() Client {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m e2eMember) Start(ctx context.Context) error {
|
func (m e2eMember) Start(ctx context.Context) error {
|
||||||
return m.Restart(ctx)
|
return m.EtcdProcess.Start(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m e2eMember) Stop() {
|
func (m e2eMember) Stop() {
|
||||||
|
@ -100,6 +100,7 @@ func (ep *EtcdServerProcess) EndpointsV3() []string { return ep.EndpointsV2
|
|||||||
func (ep *EtcdServerProcess) EndpointsMetrics() []string { return []string{ep.cfg.Murl} }
|
func (ep *EtcdServerProcess) EndpointsMetrics() []string { return []string{ep.cfg.Murl} }
|
||||||
|
|
||||||
func (ep *EtcdServerProcess) Start(ctx context.Context) error {
|
func (ep *EtcdServerProcess) Start(ctx context.Context) error {
|
||||||
|
ep.donec = make(chan struct{})
|
||||||
if ep.proc != nil {
|
if ep.proc != nil {
|
||||||
panic("already started")
|
panic("already started")
|
||||||
}
|
}
|
||||||
@ -121,7 +122,6 @@ func (ep *EtcdServerProcess) Restart(ctx context.Context) error {
|
|||||||
if err := ep.Stop(); err != nil {
|
if err := ep.Stop(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
ep.donec = make(chan struct{})
|
|
||||||
err := ep.Start(ctx)
|
err := ep.Start(ctx)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
ep.cfg.lg.Info("restarted server", zap.String("name", ep.cfg.Name))
|
ep.cfg.lg.Info("restarted server", zap.String("name", ep.cfg.Name))
|
||||||
@ -135,10 +135,10 @@ func (ep *EtcdServerProcess) Stop() (err error) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
err = ep.proc.Stop()
|
err = ep.proc.Stop()
|
||||||
|
ep.proc = nil
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
ep.proc = nil
|
|
||||||
<-ep.donec
|
<-ep.donec
|
||||||
ep.donec = make(chan struct{})
|
ep.donec = make(chan struct{})
|
||||||
if ep.cfg.Purl.Scheme == "unix" || ep.cfg.Purl.Scheme == "unixs" {
|
if ep.cfg.Purl.Scheme == "unix" || ep.cfg.Purl.Scheme == "unixs" {
|
||||||
|
@ -44,11 +44,13 @@ func (e integrationRunner) BeforeTest(t testing.TB) {
|
|||||||
|
|
||||||
func (e integrationRunner) NewCluster(ctx context.Context, t testing.TB, cfg config.ClusterConfig) Cluster {
|
func (e integrationRunner) NewCluster(ctx context.Context, t testing.TB, cfg config.ClusterConfig) Cluster {
|
||||||
var err error
|
var err error
|
||||||
var integrationCfg integration.ClusterConfig
|
integrationCfg := integration.ClusterConfig{
|
||||||
integrationCfg.Size = cfg.ClusterSize
|
Size: cfg.ClusterSize,
|
||||||
|
QuotaBackendBytes: cfg.QuotaBackendBytes,
|
||||||
|
DisableStrictReconfigCheck: cfg.DisableStrictReconfigCheck,
|
||||||
|
SnapshotCount: uint64(cfg.SnapshotCount),
|
||||||
|
}
|
||||||
integrationCfg.ClientTLS, err = tlsInfo(t, cfg.ClientTLS)
|
integrationCfg.ClientTLS, err = tlsInfo(t, cfg.ClientTLS)
|
||||||
integrationCfg.QuotaBackendBytes = cfg.QuotaBackendBytes
|
|
||||||
integrationCfg.DisableStrictReconfigCheck = cfg.DisableStrictReconfigCheck
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("ClientTLS: %s", err)
|
t.Fatalf("ClientTLS: %s", err)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user