diff --git a/storage/backend/batch_tx.go b/storage/backend/batch_tx.go index be2cf4cb9..b84cdc2a9 100644 --- a/storage/backend/batch_tx.go +++ b/storage/backend/batch_tx.go @@ -28,6 +28,7 @@ type BatchTx interface { Unlock() UnsafeCreateBucket(name []byte) UnsafePut(bucketName []byte, key []byte, value []byte) + UnsafeSeqPut(bucketName []byte, key []byte, value []byte) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) UnsafeDelete(bucketName []byte, key []byte) Commit() @@ -57,10 +58,24 @@ func (t *batchTx) UnsafeCreateBucket(name []byte) { // UnsafePut must be called holding the lock on the tx. func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) { + t.unsafePut(bucketName, key, value, false) +} + +// UnsafeSeqPut must be called holding the lock on the tx. +func (t *batchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) { + t.unsafePut(bucketName, key, value, true) +} + +func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) { bucket := t.tx.Bucket(bucketName) if bucket == nil { log.Fatalf("storage: bucket %s does not exist", string(bucketName)) } + if seq { + // it is useful to increase fill percent when the workloads are mostly append-only. + // this can delay the page split and reduce space usage. + bucket.FillPercent = 0.9 + } if err := bucket.Put(key, value); err != nil { log.Fatalf("storage: cannot put key into bucket (%v)", err) } diff --git a/storage/kvstore.go b/storage/kvstore.go index 238715484..69f3f39db 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -453,7 +453,7 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) { log.Fatalf("storage: cannot marshal event: %v", err) } - s.tx.UnsafePut(keyBucketName, ibytes, d) + s.tx.UnsafeSeqPut(keyBucketName, ibytes, d) s.kvindex.Put(key, revision{main: rev, sub: s.currentRev.sub}) s.changes = append(s.changes, kv) s.currentRev.sub += 1 @@ -514,7 +514,7 @@ func (s *store) delete(key []byte, rev revision) { log.Fatalf("storage: cannot marshal event: %v", err) } - s.tx.UnsafePut(keyBucketName, ibytes, d) + s.tx.UnsafeSeqPut(keyBucketName, ibytes, d) err = s.kvindex.Tombstone(key, revision{main: mainrev, sub: s.currentRev.sub}) if err != nil { log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err) diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index e9d449ec7..4b9660cde 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -139,13 +139,13 @@ func TestStorePut(t *testing.T) { } wact := []testutil.Action{ - {"put", []interface{}{keyBucketName, tt.wkey, data}}, + {"seqput", []interface{}{keyBucketName, tt.wkey, data}}, } if tt.rr != nil { wact = []testutil.Action{ {"range", []interface{}{keyBucketName, newTestKeyBytes(tt.r.rev, false), []byte(nil), int64(0)}}, - {"put", []interface{}{keyBucketName, tt.wkey, data}}, + {"seqput", []interface{}{keyBucketName, tt.wkey, data}}, } } @@ -305,7 +305,7 @@ func TestStoreDeleteRange(t *testing.T) { t.Errorf("#%d: marshal err = %v, want nil", i, err) } wact := []testutil.Action{ - {"put", []interface{}{keyBucketName, tt.wkey, data}}, + {"seqput", []interface{}{keyBucketName, tt.wkey, data}}, {"range", []interface{}{keyBucketName, newTestKeyBytes(revision{2, 0}, false), []byte(nil), int64(0)}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { @@ -573,6 +573,9 @@ func (b *fakeBatchTx) UnsafeCreateBucket(name []byte) {} func (b *fakeBatchTx) UnsafePut(bucketName []byte, key []byte, value []byte) { b.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{bucketName, key, value}}) } +func (b *fakeBatchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) { + b.Recorder.Record(testutil.Action{Name: "seqput", Params: []interface{}{bucketName, key, value}}) +} func (b *fakeBatchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) { b.Recorder.Record(testutil.Action{Name: "range", Params: []interface{}{bucketName, key, endKey, limit}}) r := <-b.rangeRespc