Fix deadlock on cache mutex

This commit is contained in:
kssenii 2023-01-09 15:57:54 +01:00
parent 539f4fde1c
commit 0c3d2f1dc1
3 changed files with 53 additions and 112 deletions

View File

@ -351,12 +351,9 @@ void FileCache::fillHolesWithEmptyFileSegments(
} }
} }
KeyTransactionPtr FileCache::createKeyTransaction(const Key & key, KeyNotFoundPolicy key_not_found_policy, bool assert_initialized) KeyTransactionPtr FileCache::createKeyTransaction(const Key & key, KeyNotFoundPolicy key_not_found_policy)
{ {
auto lock = cache_guard.lock(); std::lock_guard lock(key_locks_and_files_mutex);
if (assert_initialized)
assertInitializedUnlocked(lock);
auto it = files.find(key); auto it = files.find(key);
if (it == files.end()) if (it == files.end())
@ -391,6 +388,8 @@ KeyTransactionPtr FileCache::createKeyTransaction(const Key & key, KeyNotFoundPo
FileSegmentsHolderPtr FileCache::set(const Key & key, size_t offset, size_t size, const CreateFileSegmentSettings & settings) FileSegmentsHolderPtr FileCache::set(const Key & key, size_t offset, size_t size, const CreateFileSegmentSettings & settings)
{ {
assertInitialized();
auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::CREATE_EMPTY); auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::CREATE_EMPTY);
FileSegment::Range range(offset, offset + size - 1); FileSegment::Range range(offset, offset + size - 1);
@ -416,6 +415,8 @@ FileSegmentsHolderPtr FileCache::getOrSet(
size_t size, size_t size,
const CreateFileSegmentSettings & settings) const CreateFileSegmentSettings & settings)
{ {
assertInitialized();
FileSegment::Range range(offset, offset + size - 1); FileSegment::Range range(offset, offset + size - 1);
auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::CREATE_EMPTY); auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::CREATE_EMPTY);
@ -439,12 +440,14 @@ FileSegmentsHolderPtr FileCache::getOrSet(
FileSegmentsHolderPtr FileCache::get(const Key & key, size_t offset, size_t size) FileSegmentsHolderPtr FileCache::get(const Key & key, size_t offset, size_t size)
{ {
/// Get all segments which intersect with the given range. assertInitialized();
auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::RETURN_NULL); auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::RETURN_NULL);
if (key_transaction) if (key_transaction)
{ {
FileSegment::Range range(offset, offset + size - 1); FileSegment::Range range(offset, offset + size - 1);
/// Get all segments which intersect with the given range.
auto file_segments = getImpl(key, range, *key_transaction); auto file_segments = getImpl(key, range, *key_transaction);
if (!file_segments.empty()) if (!file_segments.empty())
{ {
@ -533,6 +536,7 @@ FileCache::CacheCells::iterator FileCache::addCell(
bool FileCache::tryReserve(const Key & key, size_t offset, size_t size) bool FileCache::tryReserve(const Key & key, size_t offset, size_t size)
{ {
assertInitialized();
auto queue_lock = main_priority->lock(); auto queue_lock = main_priority->lock();
auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::THROW); auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::THROW);
return tryReserveUnlocked(key, offset, size, key_transaction, queue_lock); return tryReserveUnlocked(key, offset, size, key_transaction, queue_lock);
@ -859,6 +863,8 @@ bool FileCache::tryReserveInCache(
void FileCache::removeIfExists(const Key & key) void FileCache::removeIfExists(const Key & key)
{ {
assertInitialized();
auto queue_lock = main_priority->lock(); auto queue_lock = main_priority->lock();
auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::RETURN_NULL); auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::RETURN_NULL);
if (!key_transaction) if (!key_transaction)
@ -899,6 +905,8 @@ void FileCache::removeIfExists(const Key & key)
void FileCache::removeAllReleasable() void FileCache::removeAllReleasable()
{ {
assertInitialized();
using QueueEntry = IFileCachePriority::Entry; using QueueEntry = IFileCachePriority::Entry;
using IterationResult = IFileCachePriority::IterationResult; using IterationResult = IFileCachePriority::IterationResult;
@ -1066,7 +1074,7 @@ void FileCache::loadCacheInfoIntoMemory()
continue; continue;
} }
auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::CREATE_EMPTY, false); auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::CREATE_EMPTY);
key_transaction->getOffsets().created_base_directory = true; key_transaction->getOffsets().created_base_directory = true;
if (tryReserveUnlocked(key, offset, size, key_transaction, queue_lock)) if (tryReserveUnlocked(key, offset, size, key_transaction, queue_lock))
@ -1083,7 +1091,7 @@ void FileCache::loadCacheInfoIntoMemory()
log, log,
"Cache capacity changed (max size: {}, used: {}), " "Cache capacity changed (max size: {}, used: {}), "
"cached file `{}` does not fit in cache anymore (size: {})", "cached file `{}` does not fit in cache anymore (size: {})",
max_size, getUsedCacheSize(), key_it->path().string(), size); max_size, main_priority->getCacheSize(queue_lock), key_it->path().string(), size);
fs::remove(offset_it->path()); fs::remove(offset_it->path());
} }
@ -1132,8 +1140,8 @@ void KeyTransaction::reduceSizeToDownloaded(
} }
assert(file_segment->downloaded_size <= file_segment->reserved_size); assert(file_segment->downloaded_size <= file_segment->reserved_size);
assert((*cell->queue_iterator).size == file_segment->reserved_size); assert((*cell->queue_iterator)->size == file_segment->reserved_size);
assert((*cell->queue_iterator).size >= file_segment->downloaded_size); assert((*cell->queue_iterator)->size >= file_segment->downloaded_size);
if (file_segment->reserved_size > file_segment->downloaded_size) if (file_segment->reserved_size > file_segment->downloaded_size)
{ {
@ -1147,17 +1155,20 @@ void KeyTransaction::reduceSizeToDownloaded(
FileSegment::State::DOWNLOADED, create_settings); FileSegment::State::DOWNLOADED, create_settings);
assert(file_segment->reserved_size == downloaded_size); assert(file_segment->reserved_size == downloaded_size);
assert(cell->size() == (*cell->queue_iterator).size); assert(cell->size() == (*cell->queue_iterator)->size);
} }
FileSegmentsHolderPtr FileCache::getSnapshot() FileSegmentsHolderPtr FileCache::getSnapshot()
{ {
assertInitialized();
std::lock_guard lock(key_locks_and_files_mutex);
FileSegments file_segments; FileSegments file_segments;
auto lock = cache_guard.lock(); for (const auto & [key, offsets] : files)
for (const auto & [key, _] : files)
{ {
auto key_file_segments = getSnapshot(key); for (const auto & [_, cell] : *offsets)
file_segments.insert(file_segments.end(), key_file_segments->begin(), key_file_segments->end()); file_segments.push_back(FileSegment::getSnapshot(cell.file_segment));
} }
return std::make_unique<FileSegmentsHolder>(std::move(file_segments)); return std::make_unique<FileSegmentsHolder>(std::move(file_segments));
} }
@ -1173,6 +1184,7 @@ FileSegmentsHolderPtr FileCache::getSnapshot(const Key & key)
FileSegmentsHolderPtr FileCache::dumpQueue() FileSegmentsHolderPtr FileCache::dumpQueue()
{ {
assertInitialized();
using QueueEntry = IFileCachePriority::Entry; using QueueEntry = IFileCachePriority::Entry;
using IterationResult = IFileCachePriority::IterationResult; using IterationResult = IFileCachePriority::IterationResult;
@ -1190,10 +1202,12 @@ FileSegmentsHolderPtr FileCache::dumpQueue()
std::vector<String> FileCache::tryGetCachePaths(const Key & key) std::vector<String> FileCache::tryGetCachePaths(const Key & key)
{ {
auto lock = cache_guard.lock(); assertInitialized();
std::lock_guard lock(key_locks_and_files_mutex);
std::vector<String> cache_paths;
const auto & cells_by_offset = files[key]; const auto & cells_by_offset = files[key];
std::vector<String> cache_paths;
for (const auto & [offset, cell] : *cells_by_offset) for (const auto & [offset, cell] : *cells_by_offset)
{ {
@ -1311,85 +1325,36 @@ KeyTransaction::KeyTransaction(KeyPrefixGuardPtr guard_, FileCache::CacheCellsPt
{ {
} }
void FileCache::assertInitializedUnlocked(CacheGuard::Lock & lock) const void FileCache::assertInitialized() const
{ {
using State = InitializationState; if (is_initialized)
return;
switch (initialization_state) std::unique_lock lock(init_mutex);
{ if (is_initialized)
case State::NOT_INITIALIZED: return;
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache not initialized"); if (init_exception)
} std::rethrow_exception(init_exception);
case State::FAILED: if (!is_initialized)
{ throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache not initialized");
assert(initialization_exception);
std::rethrow_exception(initialization_exception);
}
case State::INITIALIZED:
{
return;
}
case State::INITIALIZING:
{
initialization_cv.wait(lock.lock, [this]() { return initialization_state != State::INITIALIZING; });
assertInitializedUnlocked(lock);
}
}
} }
void FileCache::initialize() void FileCache::initialize()
{ {
using State = InitializationState; std::lock_guard lock(init_mutex);
{
auto lock = cache_guard.lock();
switch (initialization_state)
{
case State::NOT_INITIALIZED:
{
if (!fs::exists(cache_base_path))
{
fs::create_directories(cache_base_path);
initialization_state = InitializationState::INITIALIZED;
return;
}
else
{
initialization_state = InitializationState::INITIALIZING;
}
break;
}
case State::FAILED:
{
assert(initialization_exception);
std::rethrow_exception(initialization_exception);
}
case State::INITIALIZED:
case State::INITIALIZING:
{
return;
}
}
}
try try
{ {
loadCacheInfoIntoMemory(); loadCacheInfoIntoMemory();
{
auto lock = cache_guard.lock();
initialization_state = State::INITIALIZED;
}
} }
catch (...) catch (...)
{ {
initialization_exception = std::current_exception(); init_exception = std::current_exception();
{
auto lock = cache_guard.lock();
initialization_state = State::FAILED;
}
throw; throw;
} }
is_initialized = true;
} }
void FileCache::assertCacheCorrectness() void FileCache::assertCacheCorrectness()

View File

@ -165,18 +165,9 @@ private:
}; };
using CacheCellsPtr = std::shared_ptr<CacheCells>; using CacheCellsPtr = std::shared_ptr<CacheCells>;
mutable CacheGuard cache_guard; std::exception_ptr init_exception;
std::atomic<bool> is_initialized = false;
enum class InitializationState mutable std::mutex init_mutex;
{
NOT_INITIALIZED,
INITIALIZING,
INITIALIZED,
FAILED,
};
InitializationState initialization_state = InitializationState::NOT_INITIALIZED;
mutable std::condition_variable initialization_cv;
std::exception_ptr initialization_exception;
using CachedFiles = std::unordered_map<Key, CacheCellsPtr>; using CachedFiles = std::unordered_map<Key, CacheCellsPtr>;
CachedFiles files; CachedFiles files;
@ -185,6 +176,8 @@ private:
using KeysLocksMap = std::unordered_map<KeyPrefix, KeyPrefixGuardPtr>; using KeysLocksMap = std::unordered_map<KeyPrefix, KeyPrefixGuardPtr>;
KeysLocksMap keys_locks; KeysLocksMap keys_locks;
mutable std::mutex key_locks_and_files_mutex; /// Protects `files` and `keys_locks`
enum class KeyNotFoundPolicy enum class KeyNotFoundPolicy
{ {
THROW, THROW,
@ -192,7 +185,7 @@ private:
RETURN_NULL, RETURN_NULL,
}; };
KeyTransactionPtr createKeyTransaction(const Key & key, KeyNotFoundPolicy key_not_found_policy, bool assert_initialized = true); KeyTransactionPtr createKeyTransaction(const Key & key, KeyNotFoundPolicy key_not_found_policy);
KeyTransactionCreatorPtr getKeyTransactionCreator(const Key & key, KeyTransaction & key_transaction); KeyTransactionCreatorPtr getKeyTransactionCreator(const Key & key, KeyTransaction & key_transaction);
@ -285,7 +278,7 @@ public:
}; };
private: private:
void assertInitializedUnlocked(CacheGuard::Lock &) const; void assertInitialized() const;
void loadCacheInfoIntoMemory(); void loadCacheInfoIntoMemory();

View File

@ -11,23 +11,6 @@ namespace DB
* Cache priority queue guard > key prefix guard > file segment guard. * Cache priority queue guard > key prefix guard > file segment guard.
*/ */
/**
* Guard for the whole the cache.
* Used to get a lock per key prefix and then continue with only key prefix locked.
*/
struct CacheGuard
{
struct Lock
{
explicit Lock(CacheGuard & guard) : lock(guard.mutex) {}
std::unique_lock<std::mutex> lock;
};
std::mutex mutex;
Lock lock() { return Lock(*this); }
};
/** /**
* Guard for a set of keys. * Guard for a set of keys.
* One guard per key prefix (first three digits of the path hash). * One guard per key prefix (first three digits of the path hash).