This commit is contained in:
kssenii 2023-01-31 14:48:51 +01:00
parent 0378edc3fe
commit f497e3bcbe
7 changed files with 93 additions and 103 deletions

View File

@ -502,7 +502,7 @@ FileCache::KeyMetadata::iterator FileCache::addCell(
FileSegment::State state, FileSegment::State state,
const CreateFileSegmentSettings & settings, const CreateFileSegmentSettings & settings,
KeyTransaction & key_transaction, KeyTransaction & key_transaction,
CacheGuard::LockPtr * lock) const CacheGuard::Lock * lock)
{ {
/// Create a file segment cell and put it in `files` map by [key][offset]. /// Create a file segment cell and put it in `files` map by [key][offset].
@ -579,7 +579,7 @@ bool FileCache::tryReserveUnlocked(
size_t offset, size_t offset,
size_t size, size_t size,
KeyTransactionPtr key_transaction, KeyTransactionPtr key_transaction,
CacheGuard::LockPtr lock) const CacheGuard::Lock & lock)
{ {
auto query_context = query_limit ? query_limit->tryGetQueryContext(lock) : nullptr; auto query_context = query_limit ? query_limit->tryGetQueryContext(lock) : nullptr;
bool reserved; bool reserved;
@ -635,7 +635,7 @@ bool FileCache::tryReserveImpl(
size_t size, size_t size,
KeyTransactionPtr key_transaction, KeyTransactionPtr key_transaction,
QueryLimit::LockedQueryContext * query_context, QueryLimit::LockedQueryContext * query_context,
CacheGuard::LockPtr priority_lock) const CacheGuard::Lock & priority_lock)
{ {
/// Iterate cells in the priority of `priority_queue`. /// Iterate cells in the priority of `priority_queue`.
/// If some entry is in `priority_queue` it must be guaranteed to have a /// If some entry is in `priority_queue` it must be guaranteed to have a
@ -863,7 +863,7 @@ KeyTransaction::~KeyTransaction()
cleanupKeyDirectory(); cleanupKeyDirectory();
} }
void KeyTransaction::remove(FileSegmentPtr file_segment, CacheGuard::LockPtr cache_lock) void KeyTransaction::remove(FileSegmentPtr file_segment, const CacheGuard::Lock & cache_lock)
{ {
/// We must hold pointer to file segment while removing it. /// We must hold pointer to file segment while removing it.
chassert(file_segment->key() == key); chassert(file_segment->key() == key);
@ -879,7 +879,7 @@ bool KeyTransaction::isLastHolder(size_t offset)
void KeyTransaction::remove( void KeyTransaction::remove(
size_t offset, size_t offset,
const FileSegmentGuard::Lock & segment_lock, const FileSegmentGuard::Lock & segment_lock,
CacheGuard::LockPtr cache_lock) const CacheGuard::Lock & cache_lock)
{ {
LOG_DEBUG( LOG_DEBUG(
log, "Remove from cache. Key: {}, offset: {}", log, "Remove from cache. Key: {}, offset: {}",
@ -1059,7 +1059,7 @@ void FileCache::loadMetadata()
void KeyTransaction::reduceSizeToDownloaded( void KeyTransaction::reduceSizeToDownloaded(
size_t offset, size_t offset,
const FileSegmentGuard::Lock & segment_lock, const FileSegmentGuard::Lock & segment_lock,
CacheGuard::LockPtr cache_lock) const CacheGuard::Lock & cache_lock)
{ {
/** /**
* In case file was partially downloaded and it's download cannot be continued * In case file was partially downloaded and it's download cannot be continued
@ -1323,7 +1323,7 @@ FileCache::QueryContextHolder::~QueryContextHolder()
} }
FileCache::QueryLimit::LockedQueryContextPtr FileCache::QueryLimit::LockedQueryContextPtr
FileCache::QueryLimit::tryGetQueryContext(CacheGuard::LockPtr lock) FileCache::QueryLimit::tryGetQueryContext(const CacheGuard::Lock & lock)
{ {
if (!isQueryInitialized()) if (!isQueryInitialized())
return nullptr; return nullptr;
@ -1332,7 +1332,7 @@ FileCache::QueryLimit::tryGetQueryContext(CacheGuard::LockPtr lock)
return (query_iter == query_map.end()) ? nullptr : std::make_unique<LockedQueryContext>(query_iter->second, lock); return (query_iter == query_map.end()) ? nullptr : std::make_unique<LockedQueryContext>(query_iter->second, lock);
} }
void FileCache::QueryLimit::removeQueryContext(const std::string & query_id, CacheGuard::LockPtr) void FileCache::QueryLimit::removeQueryContext(const std::string & query_id, const CacheGuard::Lock &)
{ {
auto query_iter = query_map.find(query_id); auto query_iter = query_map.find(query_id);
if (query_iter == query_map.end()) if (query_iter == query_map.end())
@ -1348,7 +1348,7 @@ void FileCache::QueryLimit::removeQueryContext(const std::string & query_id, Cac
FileCache::QueryLimit::QueryContextPtr FileCache::QueryLimit::getOrSetQueryContext( FileCache::QueryLimit::QueryContextPtr FileCache::QueryLimit::getOrSetQueryContext(
const std::string & query_id, const std::string & query_id,
const ReadSettings & settings, const ReadSettings & settings,
CacheGuard::LockPtr) const CacheGuard::Lock &)
{ {
if (query_id.empty()) if (query_id.empty())
return nullptr; return nullptr;

View File

@ -15,7 +15,7 @@
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <IO/ReadSettings.h> #include <IO/ReadSettings.h>
#include <Interpreters/Cache/IFileCachePriority.h> #include <Interpreters/Cache/LockedFileCachePriority.h>
#include <Interpreters/Cache/LRUFileCachePriority.h> #include <Interpreters/Cache/LRUFileCachePriority.h>
#include <Interpreters/Cache/FileCacheKey.h> #include <Interpreters/Cache/FileCacheKey.h>
#include <Interpreters/Cache/FileCache_fwd.h> #include <Interpreters/Cache/FileCache_fwd.h>
@ -119,7 +119,7 @@ public:
void assertCacheCorrectness(); void assertCacheCorrectness();
CacheGuard::LockPtr createCacheTransaction() { return cache_guard.lock(); } CacheGuard::Lock createCacheTransaction() { return cache_guard.lock(); }
private: private:
using KeyAndOffset = FileCacheKeyAndOffset; using KeyAndOffset = FileCacheKeyAndOffset;
@ -221,12 +221,12 @@ private:
class LockedQueryContext; class LockedQueryContext;
using LockedQueryContextPtr = std::unique_ptr<LockedQueryContext>; using LockedQueryContextPtr = std::unique_ptr<LockedQueryContext>;
LockedQueryContextPtr tryGetQueryContext(CacheGuard::LockPtr lock); LockedQueryContextPtr tryGetQueryContext(const CacheGuard::Lock & lock);
QueryContextPtr getOrSetQueryContext( QueryContextPtr getOrSetQueryContext(
const std::string & query_id, const ReadSettings & settings, CacheGuard::LockPtr); const std::string & query_id, const ReadSettings & settings, const CacheGuard::Lock &);
void removeQueryContext(const std::string & query_id, CacheGuard::LockPtr); void removeQueryContext(const std::string & query_id, const CacheGuard::Lock &);
private: private:
using QueryContextMap = std::unordered_map<String, QueryContextPtr>; using QueryContextMap = std::unordered_map<String, QueryContextPtr>;
@ -252,7 +252,7 @@ private:
class LockedQueryContext class LockedQueryContext
{ {
public: public:
LockedQueryContext(QueryContextPtr context_, CacheGuard::LockPtr lock_) LockedQueryContext(QueryContextPtr context_, const CacheGuard::Lock & lock_)
: context(context_), lock(lock_), priority(lock_, *context->priority) {} : context(context_), lock(lock_), priority(lock_, *context->priority) {}
IFileCachePriority & getPriority() { return *context->priority; } IFileCachePriority & getPriority() { return *context->priority; }
@ -272,7 +272,7 @@ private:
private: private:
QueryContextPtr context; QueryContextPtr context;
CacheGuard::LockPtr lock; const CacheGuard::Lock & lock;
LockedCachePriority priority; LockedCachePriority priority;
}; };
}; };
@ -328,14 +328,14 @@ private:
FileSegment::State state, FileSegment::State state,
const CreateFileSegmentSettings & create_settings, const CreateFileSegmentSettings & create_settings,
KeyTransaction & key_transaction, KeyTransaction & key_transaction,
CacheGuard::LockPtr * queue_lock); const CacheGuard::Lock *);
bool tryReserveUnlocked( bool tryReserveUnlocked(
const Key & key, const Key & key,
size_t offset, size_t offset,
size_t size, size_t size,
KeyTransactionPtr key_transaction, KeyTransactionPtr key_transaction,
CacheGuard::LockPtr); const CacheGuard::Lock &);
bool tryReserveImpl( bool tryReserveImpl(
IFileCachePriority & priority_queue, IFileCachePriority & priority_queue,
@ -344,7 +344,7 @@ private:
size_t size, size_t size,
KeyTransactionPtr key_transaction, KeyTransactionPtr key_transaction,
QueryLimit::LockedQueryContext * query_context, QueryLimit::LockedQueryContext * query_context,
CacheGuard::LockPtr priority_lock); const CacheGuard::Lock &);
struct IterateAndLockResult struct IterateAndLockResult
{ {
@ -412,11 +412,11 @@ struct KeyTransaction : private boost::noncopyable
KeyTransactionCreatorPtr getCreator() const { return std::make_unique<KeyTransactionCreator>(key, offsets, cleanup_keys_metadata_queue, cache); } KeyTransactionCreatorPtr getCreator() const { return std::make_unique<KeyTransactionCreator>(key, offsets, cleanup_keys_metadata_queue, cache); }
void reduceSizeToDownloaded(size_t offset, const FileSegmentGuard::Lock &, CacheGuard::LockPtr); void reduceSizeToDownloaded(size_t offset, const FileSegmentGuard::Lock &, const CacheGuard::Lock &);
void remove(FileSegmentPtr file_segment, CacheGuard::LockPtr); void remove(FileSegmentPtr file_segment, const CacheGuard::Lock &);
void remove(size_t offset, const FileSegmentGuard::Lock &, CacheGuard::LockPtr); void remove(size_t offset, const FileSegmentGuard::Lock &, const CacheGuard::Lock &);
bool isLastHolder(size_t offset); bool isLastHolder(size_t offset);

View File

@ -398,7 +398,7 @@ FileSegment::State FileSegment::wait()
chassert(!getDownloaderUnlocked(lock).empty()); chassert(!getDownloaderUnlocked(lock).empty());
chassert(!isDownloaderUnlocked(lock)); chassert(!isDownloaderUnlocked(lock));
cv.wait_for(lock.lock, std::chrono::seconds(60)); cv.wait_for(lock, std::chrono::seconds(60));
} }
return download_state; return download_state;
@ -498,7 +498,6 @@ void FileSegment::setDownloadFailedUnlocked(const FileSegmentGuard::Lock & lock)
LOG_INFO(log, "Settings download as failed: {}", getInfoForLogUnlocked(lock)); LOG_INFO(log, "Settings download as failed: {}", getInfoForLogUnlocked(lock));
setDownloadState(State::PARTIALLY_DOWNLOADED_NO_CONTINUATION, lock); setDownloadState(State::PARTIALLY_DOWNLOADED_NO_CONTINUATION, lock);
resetDownloaderUnlocked(lock);
if (cache_writer) if (cache_writer)
{ {
@ -542,7 +541,7 @@ void FileSegment::complete()
return completeUnlocked(*key_transaction, cache_lock); return completeUnlocked(*key_transaction, cache_lock);
} }
void FileSegment::completeUnlocked(KeyTransaction & key_transaction, CacheGuard::LockPtr cache_lock) void FileSegment::completeUnlocked(KeyTransaction & key_transaction, const CacheGuard::Lock & cache_lock)
{ {
auto segment_lock = segment_guard.lock(); auto segment_lock = segment_guard.lock();

View File

@ -294,7 +294,7 @@ private:
/// Function might check if the caller of the method /// Function might check if the caller of the method
/// is the last alive holder of the segment. Therefore, completion and destruction /// is the last alive holder of the segment. Therefore, completion and destruction
/// of the file segment pointer must be done under the same cache mutex. /// of the file segment pointer must be done under the same cache mutex.
void completeUnlocked(KeyTransaction & key_transaction, CacheGuard::LockPtr); void completeUnlocked(KeyTransaction & key_transaction, const CacheGuard::Lock &);
void completePartAndResetDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock); void completePartAndResetDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock);
bool isDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock) const; bool isDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock) const;

View File

@ -16,18 +16,13 @@ namespace DB
*/ */
struct CacheGuard struct CacheGuard
{ {
struct Lock struct Lock : public std::unique_lock<std::mutex>
{ {
explicit Lock(CacheGuard & guard) : lock(guard.mutex) {} explicit Lock(std::mutex & mutex_) : std::unique_lock<std::mutex>(mutex_) {}
std::unique_lock<std::mutex> lock;
}; };
using LockPtr = std::shared_ptr<Lock>;
Lock lock() { return Lock(mutex); }
std::mutex mutex; std::mutex mutex;
LockPtr lock() { return std::make_shared<Lock>(*this); }
CacheGuard() = default;
}; };
/** /**
@ -35,18 +30,13 @@ struct CacheGuard
*/ */
struct CacheMetadataGuard struct CacheMetadataGuard
{ {
struct Lock struct Lock : public std::unique_lock<std::mutex>
{ {
explicit Lock(CacheMetadataGuard & guard) : lock(guard.mutex) {} explicit Lock(std::mutex & mutex_) : std::unique_lock<std::mutex>(mutex_) {}
std::unique_lock<std::mutex> lock;
}; };
using LockPtr = std::shared_ptr<Lock>;
Lock lock() { return Lock(mutex); }
std::mutex mutex; std::mutex mutex;
Lock lock() { return Lock(*this); }
CacheMetadataGuard() = default;
}; };
/** /**
@ -55,17 +45,13 @@ struct CacheMetadataGuard
*/ */
struct KeyGuard struct KeyGuard
{ {
struct Lock struct Lock : public std::unique_lock<std::mutex>
{ {
explicit Lock(KeyGuard & guard) : lock(guard.mutex) {} explicit Lock(std::mutex & mutex_) : std::unique_lock<std::mutex>(mutex_) {}
std::unique_lock<std::mutex> lock;
}; };
Lock lock() { return Lock(mutex); }
std::mutex mutex; std::mutex mutex;
Lock lock() { return Lock(*this); }
KeyGuard() = default;
}; };
using KeyGuardPtr = std::shared_ptr<KeyGuard>; using KeyGuardPtr = std::shared_ptr<KeyGuard>;
@ -74,15 +60,13 @@ using KeyGuardPtr = std::shared_ptr<KeyGuard>;
*/ */
struct FileSegmentGuard struct FileSegmentGuard
{ {
struct Lock struct Lock : public std::unique_lock<std::mutex>
{ {
explicit Lock(FileSegmentGuard & guard) : lock(guard.mutex) {} explicit Lock(std::mutex & mutex_) : std::unique_lock<std::mutex>(mutex_) {}
std::unique_lock<std::mutex> lock;
}; };
Lock lock() { return Lock(mutex); }
std::mutex mutex; std::mutex mutex;
Lock lock() { return Lock(*this); }
}; };
} }

View File

@ -97,53 +97,4 @@ protected:
virtual void iterate(IterateFunc && func) = 0; virtual void iterate(IterateFunc && func) = 0;
}; };
class LockedCachePriority
{
public:
LockedCachePriority(CacheGuard::LockPtr lock_, IFileCachePriority & priority_queue_)
: lock(lock_), queue(priority_queue_) {}
size_t getElementsLimit() const { return queue.max_elements; }
size_t getSizeLimit() const { return queue.max_size; }
size_t getSize() const { return queue.getSize(); }
size_t getElementsCount() const { return queue.getElementsCount(); }
IFileCachePriority::Iterator add(const FileCacheKey & key, size_t offset, size_t size, KeyTransactionCreatorPtr key_transaction_creator) { return queue.add(key, offset, size, std::move(key_transaction_creator)); }
void pop() { queue.pop(); }
void removeAll() { queue.removeAll(); }
void iterate(IFileCachePriority::IterateFunc && func) { queue.iterate(std::move(func)); }
private:
CacheGuard::LockPtr lock;
IFileCachePriority & queue;
};
class LockedCachePriorityIterator
{
public:
LockedCachePriorityIterator(CacheGuard::LockPtr lock_, IFileCachePriority::Iterator & iterator_)
: lock(lock_), iterator(iterator_) {}
IFileCachePriority::Entry & operator *() { return **iterator; }
const IFileCachePriority::Entry & operator *() const { return **iterator; }
size_t use() { return iterator->use(); }
void incrementSize(ssize_t size) { return iterator->incrementSize(size); }
IFileCachePriority::Iterator remove() { return iterator->remove(); }
private:
CacheGuard::LockPtr lock;
IFileCachePriority::Iterator & iterator;
};
using FileCachePriorityPtr = std::unique_ptr<IFileCachePriority>;
}; };

View File

@ -0,0 +1,56 @@
#include <Interpreters/Cache/IFileCachePriority.h>
namespace DB
{
class LockedCachePriority
{
public:
LockedCachePriority(const CacheGuard::Lock & lock_, IFileCachePriority & priority_queue_)
: lock(lock_), queue(priority_queue_) {}
size_t getElementsLimit() const { return queue.max_elements; }
size_t getSizeLimit() const { return queue.max_size; }
size_t getSize() const { return queue.getSize(); }
size_t getElementsCount() const { return queue.getElementsCount(); }
IFileCachePriority::Iterator add(const FileCacheKey & key, size_t offset, size_t size, KeyTransactionCreatorPtr key_transaction_creator) { return queue.add(key, offset, size, std::move(key_transaction_creator)); }
void pop() { queue.pop(); }
void removeAll() { queue.removeAll(); }
void iterate(IFileCachePriority::IterateFunc && func) { queue.iterate(std::move(func)); }
private:
[[maybe_unused]] const CacheGuard::Lock & lock;
IFileCachePriority & queue;
};
class LockedCachePriorityIterator
{
public:
LockedCachePriorityIterator(const CacheGuard::Lock & lock_, IFileCachePriority::Iterator & iterator_)
: lock(lock_), iterator(iterator_) {}
IFileCachePriority::Entry & operator *() { return **iterator; }
const IFileCachePriority::Entry & operator *() const { return **iterator; }
size_t use() { return iterator->use(); }
void incrementSize(ssize_t size) { return iterator->incrementSize(size); }
IFileCachePriority::Iterator remove() { return iterator->remove(); }
private:
[[maybe_unused]] const CacheGuard::Lock & lock;
IFileCachePriority::Iterator & iterator;
};
using FileCachePriorityPtr = std::unique_ptr<IFileCachePriority>;
}