mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
backend: set seq flag for each bucket buffer
This commit is contained in:
parent
932d42b027
commit
79eafb9719
@ -254,7 +254,7 @@ func newBatchTxBuffered(backend *backend) *batchTxBuffered {
|
|||||||
batchTx: batchTx{backend: backend},
|
batchTx: batchTx{backend: backend},
|
||||||
buf: txWriteBuffer{
|
buf: txWriteBuffer{
|
||||||
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
|
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
|
||||||
seq: true,
|
seq: make(map[string]bool),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
tx.Commit()
|
tx.Commit()
|
||||||
|
@ -37,11 +37,12 @@ func (txb *txBuffer) reset() {
|
|||||||
// txWriteBuffer buffers writes of pending updates that have not yet committed.
|
// txWriteBuffer buffers writes of pending updates that have not yet committed.
|
||||||
type txWriteBuffer struct {
|
type txWriteBuffer struct {
|
||||||
txBuffer
|
txBuffer
|
||||||
seq bool
|
seq map[string]bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func (txw *txWriteBuffer) put(bucket, k, v []byte) {
|
func (txw *txWriteBuffer) put(bucket, k, v []byte) {
|
||||||
txw.seq = false
|
txw.seq[string(bucket)] = false
|
||||||
txw.putSeq(bucket, k, v)
|
txw.putSeq(bucket, k, v)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -54,6 +55,18 @@ func (txw *txWriteBuffer) putSeq(bucket, k, v []byte) {
|
|||||||
b.add(k, v)
|
b.add(k, v)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (txw *txWriteBuffer) reset() {
|
||||||
|
txw.txBuffer.reset()
|
||||||
|
for k := range txw.seq {
|
||||||
|
v, ok := txw.buckets[k]
|
||||||
|
if !ok {
|
||||||
|
delete(txw.seq, k)
|
||||||
|
} else if v.used == 0 {
|
||||||
|
txw.seq[k] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
|
func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
|
||||||
for k, wb := range txw.buckets {
|
for k, wb := range txw.buckets {
|
||||||
rb, ok := txr.buckets[k]
|
rb, ok := txr.buckets[k]
|
||||||
@ -62,7 +75,7 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
|
|||||||
txr.buckets[k] = wb
|
txr.buckets[k] = wb
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if !txw.seq && wb.used > 1 {
|
if seq, ok := txw.seq[k]; ok && !seq && wb.used > 1 {
|
||||||
// assume no duplicate keys
|
// assume no duplicate keys
|
||||||
sort.Sort(wb)
|
sort.Sort(wb)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user