mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
mvcc/backend: clean up mutex, logging
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
parent
dd1baf6e96
commit
58e3ead219
@ -183,15 +183,15 @@ func unsafeForEach(tx *bolt.Tx, bucket []byte, visitor func(k, v []byte) error)
|
|||||||
// Commit commits a previous tx and begins a new writable one.
|
// Commit commits a previous tx and begins a new writable one.
|
||||||
func (t *batchTx) Commit() {
|
func (t *batchTx) Commit() {
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
|
||||||
t.commit(false)
|
t.commit(false)
|
||||||
|
t.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// CommitAndStop commits the previous tx and does not create a new one.
|
// CommitAndStop commits the previous tx and does not create a new one.
|
||||||
func (t *batchTx) CommitAndStop() {
|
func (t *batchTx) CommitAndStop() {
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
|
||||||
t.commit(true)
|
t.commit(true)
|
||||||
|
t.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *batchTx) Unlock() {
|
func (t *batchTx) Unlock() {
|
||||||
@ -215,19 +215,18 @@ func (t *batchTx) commit(stop bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
// gofail: var beforeCommit struct{}
|
// gofail: var beforeCommit struct{}
|
||||||
err := t.tx.Commit()
|
err := t.tx.Commit()
|
||||||
// gofail: var afterCommit struct{}
|
// gofail: var afterCommit struct{}
|
||||||
|
|
||||||
commitDurations.Observe(time.Since(start).Seconds())
|
commitDurations.Observe(time.Since(start).Seconds())
|
||||||
atomic.AddInt64(&t.backend.commits, 1)
|
atomic.AddInt64(&t.backend.commits, 1)
|
||||||
|
|
||||||
t.pending = 0
|
t.pending = 0
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if t.backend.lg != nil {
|
if t.backend.lg != nil {
|
||||||
t.backend.lg.Fatal(
|
t.backend.lg.Fatal("failed to commit tx", zap.Error(err))
|
||||||
"failed to commit tx",
|
|
||||||
zap.Error(err),
|
|
||||||
)
|
|
||||||
} else {
|
} else {
|
||||||
plog.Fatalf("cannot commit tx (%s)", err)
|
plog.Fatalf("cannot commit tx (%s)", err)
|
||||||
}
|
}
|
||||||
@ -269,31 +268,28 @@ func (t *batchTxBuffered) Unlock() {
|
|||||||
|
|
||||||
func (t *batchTxBuffered) Commit() {
|
func (t *batchTxBuffered) Commit() {
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
|
||||||
t.commit(false)
|
t.commit(false)
|
||||||
|
t.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *batchTxBuffered) CommitAndStop() {
|
func (t *batchTxBuffered) CommitAndStop() {
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
|
||||||
t.commit(true)
|
t.commit(true)
|
||||||
|
t.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *batchTxBuffered) commit(stop bool) {
|
func (t *batchTxBuffered) commit(stop bool) {
|
||||||
// all read txs must be closed to acquire boltdb commit rwlock
|
// all read txs must be closed to acquire boltdb commit rwlock
|
||||||
t.backend.readTx.mu.Lock()
|
t.backend.readTx.mu.Lock()
|
||||||
defer t.backend.readTx.mu.Unlock()
|
|
||||||
t.unsafeCommit(stop)
|
t.unsafeCommit(stop)
|
||||||
|
t.backend.readTx.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *batchTxBuffered) unsafeCommit(stop bool) {
|
func (t *batchTxBuffered) unsafeCommit(stop bool) {
|
||||||
if t.backend.readTx.tx != nil {
|
if t.backend.readTx.tx != nil {
|
||||||
if err := t.backend.readTx.tx.Rollback(); err != nil {
|
if err := t.backend.readTx.tx.Rollback(); err != nil {
|
||||||
if t.backend.lg != nil {
|
if t.backend.lg != nil {
|
||||||
t.backend.lg.Fatal(
|
t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
|
||||||
"failed to rollback tx",
|
|
||||||
zap.Error(err),
|
|
||||||
)
|
|
||||||
} else {
|
} else {
|
||||||
plog.Fatalf("cannot rollback tx (%s)", err)
|
plog.Fatalf("cannot rollback tx (%s)", err)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user