diff --git a/database/ffldb/ff/read.go b/database/ffldb/ff/read.go index 5c7d1b6bc..e8ce811e9 100644 --- a/database/ffldb/ff/read.go +++ b/database/ffldb/ff/read.go @@ -33,10 +33,11 @@ func (s *flatFileStore) read(location *flatFileLocation) ([]byte, error) { if err != nil { return nil, err } + flatFile.RLock() + defer flatFile.RUnlock() data := make([]byte, location.dataLength) n, err := flatFile.file.ReadAt(data, int64(location.fileOffset)) - flatFile.RUnlock() if err != nil { return nil, errors.Wrapf(err, "failed to read data in store '%s' "+ "from file %d, offset %d", s.storeName, location.fileNumber, @@ -62,43 +63,31 @@ func (s *flatFileStore) read(location *flatFileLocation) ([]byte, error) { // will also open the file when it's not already open subject to the rules // described in openFile. Also handles closing files as needed to avoid going // over the max allowed open files. -// -// NOTE: The returned flat file will already have the read lock acquired and -// the caller MUST call .RUnlock() to release it once it has finished all read -// operations. This is necessary because otherwise it would be possible for a -// separate goroutine to close the file after it is returned from here, but -// before the caller has acquired a read lock. func (s *flatFileStore) flatFile(fileNumber uint32) (*lockableFile, error) { // When the requested flat file is open for writes, return it. s.writeCursor.RLock() + defer s.writeCursor.RUnlock() if fileNumber == s.writeCursor.currentFileNumber && s.writeCursor.currentFile.file != nil { openFile := s.writeCursor.currentFile - openFile.RLock() - s.writeCursor.RUnlock() return openFile, nil } - s.writeCursor.RUnlock() // Try to return an open file under the overall files read lock. s.openFilesMutex.RLock() + defer s.openFilesMutex.RUnlock() if openFile, ok := s.openFiles[fileNumber]; ok { s.lruMutex.Lock() - s.openFilesLRU.MoveToFront(s.fileNumberToLRUElement[fileNumber]) - s.lruMutex.Unlock() + defer s.lruMutex.Unlock() + + s.openFilesLRU.MoveToFront(s.fileNumberToLRUElement[fileNumber]) - openFile.RLock() - s.openFilesMutex.RUnlock() return openFile, nil } - s.openFilesMutex.RUnlock() // Since the file isn't open already, need to check the open files map // again under write lock in case multiple readers got here and a // separate one is already opening the file. - s.openFilesMutex.Lock() if openFlatFile, ok := s.openFiles[fileNumber]; ok { - openFlatFile.RLock() - s.openFilesMutex.Unlock() return openFlatFile, nil } @@ -106,11 +95,8 @@ func (s *flatFileStore) flatFile(fileNumber uint32) (*lockableFile, error) { // recently used one as needed. openFile, err := s.openFile(fileNumber) if err != nil { - s.openFilesMutex.Unlock() return nil, err } - openFile.RLock() - s.openFilesMutex.Unlock() return openFile, nil } @@ -142,6 +128,7 @@ func (s *flatFileStore) openFile(fileNumber uint32) (*lockableFile, error) { // recently used list to indicate it is the most recently used file and // therefore should be closed last. s.lruMutex.Lock() + defer s.lruMutex.Unlock() lruList := s.openFilesLRU if lruList.Len() >= maxOpenFiles { lruFileNumber := lruList.Remove(lruList.Back()).(uint32) @@ -151,14 +138,13 @@ func (s *flatFileStore) openFile(fileNumber uint32) (*lockableFile, error) { // any readers are currently reading from it so it's not closed // out from under them. oldFile.Lock() + defer oldFile.Unlock() _ = oldFile.file.Close() - oldFile.Unlock() delete(s.openFiles, lruFileNumber) delete(s.fileNumberToLRUElement, lruFileNumber) } s.fileNumberToLRUElement[fileNumber] = lruList.PushFront(fileNumber) - s.lruMutex.Unlock() // Store a reference to it in the open files map. s.openFiles[fileNumber] = flatFile diff --git a/database/ffldb/ff/rollback.go b/database/ffldb/ff/rollback.go index e11675f49..2da000ecb 100644 --- a/database/ffldb/ff/rollback.go +++ b/database/ffldb/ff/rollback.go @@ -64,12 +64,7 @@ func (s *flatFileStore) rollback(targetLocation *flatFileLocation) error { // Close the current write file if it needs to be deleted. if s.writeCursor.currentFileNumber > targetFileNumber { - s.writeCursor.currentFile.Lock() - if s.writeCursor.currentFile.file != nil { - s.writeCursor.currentFile.file.Close() - s.writeCursor.currentFile.file = nil - } - s.writeCursor.currentFile.Unlock() + s.closeCurrentWriteCursorFile() } // Delete all files that are newer than the provided rollback file @@ -90,10 +85,10 @@ func (s *flatFileStore) rollback(targetLocation *flatFileLocation) error { // Open the file for the current write cursor if needed. s.writeCursor.currentFile.Lock() + defer s.writeCursor.currentFile.Unlock() if s.writeCursor.currentFile.file == nil { openFile, err := s.openWriteFile(s.writeCursor.currentFileNumber) if err != nil { - s.writeCursor.currentFile.Unlock() return err } s.writeCursor.currentFile.file = openFile @@ -102,14 +97,12 @@ func (s *flatFileStore) rollback(targetLocation *flatFileLocation) error { // Truncate the file to the provided target offset. err := s.writeCursor.currentFile.file.Truncate(int64(targetFileOffset)) if err != nil { - s.writeCursor.currentFile.Unlock() return errors.Wrapf(err, "ROLLBACK: Failed to truncate file %d "+ "in store '%s'", s.writeCursor.currentFileNumber, s.storeName) } // Sync the file to disk. err = s.writeCursor.currentFile.file.Sync() - s.writeCursor.currentFile.Unlock() if err != nil { return errors.Wrapf(err, "ROLLBACK: Failed to sync file %d in "+ "store '%s'", s.writeCursor.currentFileNumber, s.storeName) diff --git a/database/ffldb/ff/write.go b/database/ffldb/ff/write.go index 897fa2da2..362fbc46e 100644 --- a/database/ffldb/ff/write.go +++ b/database/ffldb/ff/write.go @@ -47,18 +47,16 @@ func (s *flatFileStore) write(data []byte) (*flatFileLocation, error) { // with LRU tracking. The close is done under the write lock // for the file to prevent it from being closed out from under // any readers currently reading from it. - cursor.Lock() - cursor.currentFile.Lock() - if cursor.currentFile.file != nil { - _ = cursor.currentFile.file.Close() - cursor.currentFile.file = nil - } - cursor.currentFile.Unlock() + func() { + cursor.Lock() + defer cursor.Unlock() - // Start writes into next file. - cursor.currentFileNumber++ - cursor.currentOffset = 0 - cursor.Unlock() + s.closeCurrentWriteCursorFile() + + // Start writes into next file. + cursor.currentFileNumber++ + cursor.currentOffset = 0 + }() } // All writes are done under the write lock for the file to ensure any @@ -164,3 +162,15 @@ func (s *flatFileStore) writeData(data []byte, fieldName string) error { return nil } + +// closeCurrentWriteCursorFile closes the currently open writeCursor file if +// it's open. +// This method MUST be called with the writeCursor lock held for writes. +func (s *flatFileStore) closeCurrentWriteCursorFile() { + s.writeCursor.currentFile.Lock() + defer s.writeCursor.currentFile.Unlock() + if s.writeCursor.currentFile.file != nil { + _ = s.writeCursor.currentFile.file.Close() + s.writeCursor.currentFile.file = nil + } +}