mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-14 02:12:21 +00:00
Use key locks instead of key prefix locks
This commit is contained in:
parent
0c3d2f1dc1
commit
7fa27ddc7f
@ -73,22 +73,36 @@ String FileCache::getPathInLocalCache(const Key & key) const
|
||||
return fs::path(cache_base_path) / key_str.substr(0, 3) / key_str;
|
||||
}
|
||||
|
||||
void FileCache::removeKeyDirectoryIfExists(const Key & key, const KeyPrefixGuard::Lock &) const
|
||||
void FileCache::assertInitialized() const
|
||||
{
|
||||
/// Note: it is guaranteed that there is no concurrency here with files deletion
|
||||
/// because cache key directories are create only in FileCache class under cache_lock.
|
||||
|
||||
auto key_str = key.toString();
|
||||
auto key_prefix_path = fs::path(cache_base_path) / key_str.substr(0, 3);
|
||||
auto key_path = key_prefix_path / key_str;
|
||||
|
||||
if (!fs::exists(key_path))
|
||||
if (is_initialized)
|
||||
return;
|
||||
|
||||
fs::remove_all(key_path);
|
||||
std::unique_lock lock(init_mutex);
|
||||
if (is_initialized)
|
||||
return;
|
||||
|
||||
if (fs::is_empty(key_prefix_path))
|
||||
fs::remove(key_prefix_path);
|
||||
if (init_exception)
|
||||
std::rethrow_exception(init_exception);
|
||||
if (!is_initialized)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache not initialized");
|
||||
}
|
||||
|
||||
void FileCache::initialize()
|
||||
{
|
||||
std::lock_guard lock(init_mutex);
|
||||
|
||||
try
|
||||
{
|
||||
loadMetadata();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
init_exception = std::current_exception();
|
||||
throw;
|
||||
}
|
||||
|
||||
is_initialized = true;
|
||||
}
|
||||
|
||||
static bool isQueryInitialized()
|
||||
@ -353,7 +367,7 @@ void FileCache::fillHolesWithEmptyFileSegments(
|
||||
|
||||
KeyTransactionPtr FileCache::createKeyTransaction(const Key & key, KeyNotFoundPolicy key_not_found_policy)
|
||||
{
|
||||
std::lock_guard lock(key_locks_and_files_mutex);
|
||||
std::lock_guard lock(files_mutex);
|
||||
|
||||
auto it = files.find(key);
|
||||
if (it == files.end())
|
||||
@ -371,19 +385,13 @@ KeyTransactionPtr FileCache::createKeyTransaction(const Key & key, KeyNotFoundPo
|
||||
}
|
||||
case KeyNotFoundPolicy::CREATE_EMPTY:
|
||||
{
|
||||
it = files.emplace(key, std::make_shared<CacheCells>()).first;
|
||||
it = files.emplace(key, CachedFilesMetadata()).first;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
auto lock_it = keys_locks.find(key.key_prefix);
|
||||
if (lock_it == keys_locks.end())
|
||||
{
|
||||
lock_it = keys_locks.emplace(key.key_prefix, std::make_shared<KeyPrefixGuard>()).first;
|
||||
}
|
||||
|
||||
return std::make_unique<KeyTransaction>(lock_it->second, it->second);
|
||||
return std::make_unique<KeyTransaction>(it->second.guard, it->second.cells);
|
||||
}
|
||||
|
||||
FileSegmentsHolderPtr FileCache::set(const Key & key, size_t offset, size_t size, const CreateFileSegmentSettings & settings)
|
||||
@ -478,7 +486,6 @@ FileCache::CacheCells::iterator FileCache::addCell(
|
||||
/// Create a file segment cell and put it in `files` map by [key][offset].
|
||||
|
||||
chassert(size > 0); /// Empty cells are not allowed.
|
||||
chassert(!key.key_prefix.empty());
|
||||
|
||||
auto it = key_transaction.getOffsets().find(offset);
|
||||
if (it != key_transaction.getOffsets().end())
|
||||
@ -719,7 +726,7 @@ void FileCache::iterateAndCollectKeyLocks(
|
||||
{
|
||||
KeyTransactionPtr current;
|
||||
|
||||
auto locked_it = locked_map.find(entry.key.key_prefix);
|
||||
auto locked_it = locked_map.find(entry.key);
|
||||
const bool locked = locked_it != locked_map.end();
|
||||
if (locked)
|
||||
current = locked_it->second;
|
||||
@ -728,7 +735,7 @@ void FileCache::iterateAndCollectKeyLocks(
|
||||
|
||||
auto res = func(entry, *current);
|
||||
if (res.lock_key && !locked)
|
||||
locked_map.emplace(entry.key.key_prefix, current);
|
||||
locked_map.emplace(entry.key, current);
|
||||
|
||||
return res.iteration_result;
|
||||
}, queue_lock);
|
||||
@ -763,7 +770,7 @@ bool FileCache::tryReserveInCache(
|
||||
};
|
||||
|
||||
KeyTransactionsMap locked;
|
||||
locked[key.key_prefix] = key_transaction;
|
||||
locked[key] = key_transaction;
|
||||
|
||||
using QueueEntry = IFileCachePriority::Entry;
|
||||
using IterationResult = IFileCachePriority::IterationResult;
|
||||
@ -861,6 +868,25 @@ bool FileCache::tryReserveInCache(
|
||||
return true;
|
||||
}
|
||||
|
||||
void FileCache::removeKeyDirectoryIfExists(const Key & key, const KeyGuard::Lock &) const
|
||||
{
|
||||
/// Note: it is guaranteed that there is no concurrency here with files deletion
|
||||
/// because cache key directories are create only in FileCache class under cache_lock.
|
||||
|
||||
auto key_str = key.toString();
|
||||
auto key_prefix_path = fs::path(cache_base_path) / key_str.substr(0, 3);
|
||||
auto key_path = key_prefix_path / key_str;
|
||||
|
||||
if (!fs::exists(key_path))
|
||||
return;
|
||||
|
||||
fs::remove_all(key_path);
|
||||
|
||||
if (fs::is_empty(key_prefix_path))
|
||||
fs::remove(key_prefix_path);
|
||||
}
|
||||
|
||||
|
||||
void FileCache::removeIfExists(const Key & key)
|
||||
{
|
||||
assertInitialized();
|
||||
@ -993,7 +1019,7 @@ void KeyTransaction::remove(
|
||||
}
|
||||
}
|
||||
|
||||
void FileCache::loadCacheInfoIntoMemory()
|
||||
void FileCache::loadMetadata()
|
||||
{
|
||||
auto queue_lock = main_priority->lock();
|
||||
|
||||
@ -1139,9 +1165,10 @@ void KeyTransaction::reduceSizeToDownloaded(
|
||||
file_segment->getInfoForLogUnlocked(segment_lock));
|
||||
}
|
||||
|
||||
[[maybe_unused]] const auto & entry = **cell->queue_iterator;
|
||||
assert(file_segment->downloaded_size <= file_segment->reserved_size);
|
||||
assert((*cell->queue_iterator)->size == file_segment->reserved_size);
|
||||
assert((*cell->queue_iterator)->size >= file_segment->downloaded_size);
|
||||
assert(entry.size == file_segment->reserved_size);
|
||||
assert(entry.size >= file_segment->downloaded_size);
|
||||
|
||||
if (file_segment->reserved_size > file_segment->downloaded_size)
|
||||
{
|
||||
@ -1155,19 +1182,19 @@ void KeyTransaction::reduceSizeToDownloaded(
|
||||
FileSegment::State::DOWNLOADED, create_settings);
|
||||
|
||||
assert(file_segment->reserved_size == downloaded_size);
|
||||
assert(cell->size() == (*cell->queue_iterator)->size);
|
||||
assert(cell->size() == entry.size);
|
||||
}
|
||||
|
||||
FileSegmentsHolderPtr FileCache::getSnapshot()
|
||||
{
|
||||
assertInitialized();
|
||||
|
||||
std::lock_guard lock(key_locks_and_files_mutex);
|
||||
std::lock_guard lock(files_mutex);
|
||||
|
||||
FileSegments file_segments;
|
||||
for (const auto & [key, offsets] : files)
|
||||
for (const auto & [key, metadata] : files)
|
||||
{
|
||||
for (const auto & [_, cell] : *offsets)
|
||||
for (const auto & [_, cell] : *metadata.cells)
|
||||
file_segments.push_back(FileSegment::getSnapshot(cell.file_segment));
|
||||
}
|
||||
return std::make_unique<FileSegmentsHolder>(std::move(file_segments));
|
||||
@ -1204,9 +1231,9 @@ std::vector<String> FileCache::tryGetCachePaths(const Key & key)
|
||||
{
|
||||
assertInitialized();
|
||||
|
||||
std::lock_guard lock(key_locks_and_files_mutex);
|
||||
std::lock_guard lock(files_mutex);
|
||||
|
||||
const auto & cells_by_offset = files[key];
|
||||
const auto & cells_by_offset = files[key].cells;
|
||||
std::vector<String> cache_paths;
|
||||
|
||||
for (const auto & [offset, cell] : *cells_by_offset)
|
||||
@ -1317,7 +1344,7 @@ std::string FileCache::CacheCells::toString() const
|
||||
return result;
|
||||
}
|
||||
|
||||
KeyTransaction::KeyTransaction(KeyPrefixGuardPtr guard_, FileCache::CacheCellsPtr offsets_)
|
||||
KeyTransaction::KeyTransaction(KeyGuardPtr guard_, FileCache::CacheCellsPtr offsets_)
|
||||
: guard(guard_)
|
||||
, lock(guard->lock())
|
||||
, offsets(offsets_)
|
||||
@ -1325,43 +1352,11 @@ KeyTransaction::KeyTransaction(KeyPrefixGuardPtr guard_, FileCache::CacheCellsPt
|
||||
{
|
||||
}
|
||||
|
||||
void FileCache::assertInitialized() const
|
||||
{
|
||||
if (is_initialized)
|
||||
return;
|
||||
|
||||
std::unique_lock lock(init_mutex);
|
||||
if (is_initialized)
|
||||
return;
|
||||
|
||||
if (init_exception)
|
||||
std::rethrow_exception(init_exception);
|
||||
if (!is_initialized)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache not initialized");
|
||||
}
|
||||
|
||||
void FileCache::initialize()
|
||||
{
|
||||
std::lock_guard lock(init_mutex);
|
||||
|
||||
try
|
||||
{
|
||||
loadCacheInfoIntoMemory();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
init_exception = std::current_exception();
|
||||
throw;
|
||||
}
|
||||
|
||||
is_initialized = true;
|
||||
}
|
||||
|
||||
void FileCache::assertCacheCorrectness()
|
||||
{
|
||||
for (const auto & [key, cells_by_offset] : files)
|
||||
for (const auto & [key, metadata] : files)
|
||||
{
|
||||
for (const auto & [_, cell] : *cells_by_offset)
|
||||
for (const auto & [_, cell] : *metadata.cells)
|
||||
{
|
||||
const auto & file_segment = cell.file_segment;
|
||||
file_segment->assertCorrectness();
|
||||
|
@ -27,8 +27,7 @@ namespace DB
|
||||
|
||||
struct KeyTransaction;
|
||||
using KeyTransactionPtr = std::shared_ptr<KeyTransaction>;
|
||||
using KeyPrefix = std::string;
|
||||
using KeyTransactionsMap = std::unordered_map<KeyPrefix, KeyTransactionPtr>;
|
||||
using KeyTransactionsMap = std::unordered_map<FileCacheKey, KeyTransactionPtr>;
|
||||
|
||||
|
||||
/// Local cache for remote filesystem files, represented as a set of non-overlapping non-empty file segments.
|
||||
@ -169,14 +168,17 @@ private:
|
||||
std::atomic<bool> is_initialized = false;
|
||||
mutable std::mutex init_mutex;
|
||||
|
||||
using CachedFiles = std::unordered_map<Key, CacheCellsPtr>;
|
||||
struct CachedFilesMetadata
|
||||
{
|
||||
CacheCellsPtr cells;
|
||||
KeyGuardPtr guard;
|
||||
|
||||
CachedFilesMetadata() : cells(std::make_shared<CacheCells>()), guard(std::make_shared<KeyGuard>()) {}
|
||||
};
|
||||
using CachedFiles = std::unordered_map<Key, CachedFilesMetadata>;
|
||||
CachedFiles files;
|
||||
|
||||
using KeyPrefix = std::string;
|
||||
using KeysLocksMap = std::unordered_map<KeyPrefix, KeyPrefixGuardPtr>;
|
||||
KeysLocksMap keys_locks;
|
||||
|
||||
mutable std::mutex key_locks_and_files_mutex; /// Protects `files` and `keys_locks`
|
||||
mutable std::mutex files_mutex; /// Protects `files` and `keys_locks`
|
||||
|
||||
enum class KeyNotFoundPolicy
|
||||
{
|
||||
@ -280,7 +282,7 @@ public:
|
||||
private:
|
||||
void assertInitialized() const;
|
||||
|
||||
void loadCacheInfoIntoMemory();
|
||||
void loadMetadata();
|
||||
|
||||
FileSegments getImpl(
|
||||
const Key & key,
|
||||
@ -335,7 +337,7 @@ private:
|
||||
KeyTransactionPtr key_transaction,
|
||||
const CachePriorityQueueGuard::Lock &);
|
||||
|
||||
void removeKeyDirectoryIfExists(const Key & key, const KeyPrefixGuard::Lock & lock) const;
|
||||
void removeKeyDirectoryIfExists(const Key & key, const KeyGuard::Lock & lock) const;
|
||||
|
||||
struct IterateAndLockResult
|
||||
{
|
||||
@ -353,12 +355,12 @@ private:
|
||||
struct KeyTransactionCreator
|
||||
{
|
||||
KeyTransactionCreator(
|
||||
KeyPrefixGuardPtr guard_, FileCache::CacheCellsPtr offsets_)
|
||||
KeyGuardPtr guard_, FileCache::CacheCellsPtr offsets_)
|
||||
: guard(guard_) , offsets(offsets_) {}
|
||||
|
||||
KeyTransactionPtr create();
|
||||
|
||||
KeyPrefixGuardPtr guard;
|
||||
KeyGuardPtr guard;
|
||||
FileCache::CacheCellsPtr offsets;
|
||||
};
|
||||
using KeyTransactionCreatorPtr = std::unique_ptr<KeyTransactionCreator>;
|
||||
@ -367,7 +369,7 @@ struct KeyTransaction : private boost::noncopyable
|
||||
{
|
||||
using Key = FileCacheKey;
|
||||
|
||||
KeyTransaction(KeyPrefixGuardPtr guard_, FileCache::CacheCellsPtr offsets_);
|
||||
KeyTransaction(KeyGuardPtr guard_, FileCache::CacheCellsPtr offsets_);
|
||||
|
||||
KeyTransactionCreatorPtr getCreator() const { return std::make_unique<KeyTransactionCreator>(guard, offsets); }
|
||||
|
||||
@ -385,8 +387,8 @@ struct KeyTransaction : private boost::noncopyable
|
||||
std::vector<size_t> delete_offsets;
|
||||
|
||||
private:
|
||||
KeyPrefixGuardPtr guard;
|
||||
const KeyPrefixGuard::Lock lock;
|
||||
KeyGuardPtr guard;
|
||||
const KeyGuard::Lock lock;
|
||||
|
||||
FileCache::CacheCellsPtr offsets;
|
||||
|
||||
|
@ -10,13 +10,11 @@ namespace DB
|
||||
|
||||
FileCacheKey::FileCacheKey(const std::string & path)
|
||||
: key(sipHash128(path.data(), path.size()))
|
||||
, key_prefix(toString().substr(0, 3))
|
||||
{
|
||||
}
|
||||
|
||||
FileCacheKey::FileCacheKey(const UInt128 & key_)
|
||||
: key(key_)
|
||||
, key_prefix(toString().substr(0, 3))
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -6,10 +6,8 @@ namespace DB
|
||||
|
||||
struct FileCacheKey
|
||||
{
|
||||
/// Hash of the path.
|
||||
UInt128 key;
|
||||
/// Prefix of the path.
|
||||
std::string key_prefix;
|
||||
using KeyHash = UInt128;
|
||||
KeyHash key;
|
||||
|
||||
std::string toString() const;
|
||||
|
||||
|
@ -15,11 +15,11 @@ namespace DB
|
||||
* Guard for a set of keys.
|
||||
* One guard per key prefix (first three digits of the path hash).
|
||||
*/
|
||||
struct KeyPrefixGuard
|
||||
struct KeyGuard
|
||||
{
|
||||
struct Lock
|
||||
{
|
||||
explicit Lock(KeyPrefixGuard & guard) : lock(guard.mutex) {}
|
||||
explicit Lock(KeyGuard & guard) : lock(guard.mutex) {}
|
||||
std::unique_lock<std::mutex> lock;
|
||||
};
|
||||
|
||||
@ -27,9 +27,9 @@ struct KeyPrefixGuard
|
||||
|
||||
Lock lock() { return Lock(*this); }
|
||||
|
||||
KeyPrefixGuard() = default;
|
||||
KeyGuard() = default;
|
||||
};
|
||||
using KeyPrefixGuardPtr = std::shared_ptr<KeyPrefixGuard>;
|
||||
using KeyGuardPtr = std::shared_ptr<KeyGuard>;
|
||||
|
||||
/**
|
||||
* Cache priority queue guard.
|
||||
|
Loading…
Reference in New Issue
Block a user