mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00

After the upgrade of boltdb, db.Close waits for all txn to finish. CommitAndStop commits the current txn and stop creating new ones.
95 lines
1.7 KiB
Go
95 lines
1.7 KiB
Go
package backend
|
|
|
|
import (
|
|
"io"
|
|
"log"
|
|
"time"
|
|
|
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/boltdb/bolt"
|
|
)
|
|
|
|
type Backend interface {
|
|
BatchTx() BatchTx
|
|
Snapshot(w io.Writer) (n int64, err error)
|
|
ForceCommit()
|
|
Close() error
|
|
}
|
|
|
|
type backend struct {
|
|
db *bolt.DB
|
|
|
|
batchInterval time.Duration
|
|
batchLimit int
|
|
batchTx *batchTx
|
|
|
|
stopc chan struct{}
|
|
startc chan struct{}
|
|
donec chan struct{}
|
|
}
|
|
|
|
func New(path string, d time.Duration, limit int) Backend {
|
|
db, err := bolt.Open(path, 0600, nil)
|
|
if err != nil {
|
|
log.Panicf("backend: cannot open database at %s (%v)", path, err)
|
|
}
|
|
|
|
b := &backend{
|
|
db: db,
|
|
|
|
batchInterval: d,
|
|
batchLimit: limit,
|
|
batchTx: &batchTx{},
|
|
|
|
stopc: make(chan struct{}),
|
|
startc: make(chan struct{}),
|
|
donec: make(chan struct{}),
|
|
}
|
|
b.batchTx.backend = b
|
|
go b.run()
|
|
<-b.startc
|
|
return b
|
|
}
|
|
|
|
// BatchTx returns the current batch tx in coalescer. The tx can be used for read and
|
|
// write operations. The write result can be retrieved within the same tx immediately.
|
|
// The write result is isolated with other txs until the current one get committed.
|
|
func (b *backend) BatchTx() BatchTx {
|
|
return b.batchTx
|
|
}
|
|
|
|
// force commit the current batching tx.
|
|
func (b *backend) ForceCommit() {
|
|
b.batchTx.Commit()
|
|
}
|
|
|
|
func (b *backend) Snapshot(w io.Writer) (n int64, err error) {
|
|
b.db.View(func(tx *bolt.Tx) error {
|
|
n, err = tx.WriteTo(w)
|
|
return nil
|
|
})
|
|
return n, err
|
|
}
|
|
|
|
func (b *backend) run() {
|
|
defer close(b.donec)
|
|
|
|
b.batchTx.Commit()
|
|
b.startc <- struct{}{}
|
|
|
|
for {
|
|
select {
|
|
case <-time.After(b.batchInterval):
|
|
case <-b.stopc:
|
|
b.batchTx.CommitAndStop()
|
|
return
|
|
}
|
|
b.batchTx.Commit()
|
|
}
|
|
}
|
|
|
|
func (b *backend) Close() error {
|
|
close(b.stopc)
|
|
<-b.donec
|
|
return b.db.Close()
|
|
}
|