mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-22 01:30:51 +00:00
Merge pull request #51161 from kssenii/more-metrics-in-cache
More profile events for fs cache
This commit is contained in:
commit
c3227fd44f
@ -381,11 +381,25 @@ The server successfully detected this situation and will download merged part fr
|
||||
M(CachedReadBufferReadFromCacheBytes, "Bytes read from filesystem cache") \
|
||||
M(CachedReadBufferCacheWriteBytes, "Bytes written from source (remote fs, etc) to filesystem cache") \
|
||||
M(CachedReadBufferCacheWriteMicroseconds, "Time spent writing data into filesystem cache") \
|
||||
M(CachedReadBufferCreateBufferMicroseconds, "Prepare buffer time") \
|
||||
M(CachedWriteBufferCacheWriteBytes, "Bytes written from source (remote fs, etc) to filesystem cache") \
|
||||
M(CachedWriteBufferCacheWriteMicroseconds, "Time spent writing data into filesystem cache") \
|
||||
\
|
||||
M(FilesystemCacheEvictedBytes, "Number of bytes evicted from filesystem cache") \
|
||||
M(FilesystemCacheEvictedFileSegments, "Number of file segments evicted from filesystem cache") \
|
||||
M(FilesystemCacheLockKeyMicroseconds, "Lock cache key time") \
|
||||
M(FilesystemCacheLockMetadataMicroseconds, "Lock filesystem cache metadata time") \
|
||||
M(FilesystemCacheLockCacheMicroseconds, "Lock filesystem cache time") \
|
||||
M(FilesystemCacheReserveMicroseconds, "Filesystem cache space reservation time") \
|
||||
M(FilesystemCacheGetOrSetMicroseconds, "Filesystem cache getOrSet() time") \
|
||||
M(FilesystemCacheGetMicroseconds, "Filesystem cache get() time") \
|
||||
M(FileSegmentWaitMicroseconds, "Wait on DOWNLOADING state") \
|
||||
M(FileSegmentCompleteMicroseconds, "Duration of FileSegment::complete() in filesystem cache") \
|
||||
M(FileSegmentLockMicroseconds, "Lock file segment time") \
|
||||
M(FileSegmentWriteMicroseconds, "File segment write() time") \
|
||||
M(FileSegmentUseMicroseconds, "File segment use() time") \
|
||||
M(FileSegmentRemoveMicroseconds, "File segment remove() time") \
|
||||
M(FileSegmentHolderCompleteMicroseconds, "File segments holder complete() time") \
|
||||
\
|
||||
M(RemoteFSSeeks, "Total number of seeks for async buffer") \
|
||||
M(RemoteFSPrefetches, "Number of prefetches made with asynchronous reading from remote filesystem") \
|
||||
@ -407,7 +421,6 @@ The server successfully detected this situation and will download merged part fr
|
||||
\
|
||||
M(FileSegmentWaitReadBufferMicroseconds, "Metric per file segment. Time spend waiting for internal read buffer (includes cache waiting)") \
|
||||
M(FileSegmentReadMicroseconds, "Metric per file segment. Time spend reading from file") \
|
||||
M(FileSegmentWriteMicroseconds, "Metric per file segment. Time spend writing cache") \
|
||||
M(FileSegmentCacheWriteMicroseconds, "Metric per file segment. Time spend writing data to cache") \
|
||||
M(FileSegmentPredownloadMicroseconds, "Metric per file segment. Time spent predownloading data to cache (predownloading - finishing file segment download (after someone who failed to do that) up to the point current thread was requested to do)") \
|
||||
M(FileSegmentUsedBytes, "Metric per file segment. How many bytes were actually used from current file segment") \
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <IO/BoundedReadBuffer.h>
|
||||
#include <Common/getRandomASCIIString.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/ElapsedTimeProfileEventIncrement.h>
|
||||
#include <base/hex.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
@ -26,6 +27,7 @@ extern const Event CachedReadBufferCacheWriteMicroseconds;
|
||||
extern const Event CachedReadBufferReadFromSourceBytes;
|
||||
extern const Event CachedReadBufferReadFromCacheBytes;
|
||||
extern const Event CachedReadBufferCacheWriteBytes;
|
||||
extern const Event CachedReadBufferCreateBufferMicroseconds;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
@ -145,6 +147,8 @@ void CachedOnDiskReadBufferFromFile::initialize(size_t offset, size_t size)
|
||||
CachedOnDiskReadBufferFromFile::ImplementationBufferPtr
|
||||
CachedOnDiskReadBufferFromFile::getCacheReadBuffer(const FileSegment & file_segment) const
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::CachedReadBufferCreateBufferMicroseconds);
|
||||
|
||||
/// Use is_persistent flag from in-memory state of the filesegment,
|
||||
/// because it is consistent with what is written on disk.
|
||||
auto path = file_segment.getPathInLocalCache();
|
||||
@ -167,6 +171,8 @@ CachedOnDiskReadBufferFromFile::getCacheReadBuffer(const FileSegment & file_segm
|
||||
CachedOnDiskReadBufferFromFile::ImplementationBufferPtr
|
||||
CachedOnDiskReadBufferFromFile::getRemoteReadBuffer(FileSegment & file_segment, ReadType read_type_)
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::CachedReadBufferCreateBufferMicroseconds);
|
||||
|
||||
switch (read_type_)
|
||||
{
|
||||
case ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE:
|
||||
@ -401,6 +407,8 @@ CachedOnDiskReadBufferFromFile::getImplementationBuffer(FileSegment & file_segme
|
||||
current_file_segment_counters.increment(
|
||||
ProfileEvents::FileSegmentWaitReadBufferMicroseconds, watch.elapsedMicroseconds());
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::FileSegmentWaitReadBufferMicroseconds, watch.elapsedMicroseconds());
|
||||
|
||||
[[maybe_unused]] auto download_current_segment = read_type == ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE;
|
||||
chassert(download_current_segment == file_segment.isDownloader());
|
||||
|
||||
|
@ -138,6 +138,7 @@ void CachedObjectStorage::removeCacheIfExists(const std::string & path_key_for_c
|
||||
|
||||
void CachedObjectStorage::removeObject(const StoredObject & object)
|
||||
{
|
||||
removeCacheIfExists(object.remote_path);
|
||||
object_storage->removeObject(object);
|
||||
}
|
||||
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include <base/hex.h>
|
||||
#include <pcg-random/pcg_random.hpp>
|
||||
#include <Common/randomSeed.h>
|
||||
#include <Common/ElapsedTimeProfileEventIncrement.h>
|
||||
|
||||
#include <filesystem>
|
||||
|
||||
@ -21,6 +22,10 @@ namespace ProfileEvents
|
||||
{
|
||||
extern const Event FilesystemCacheEvictedBytes;
|
||||
extern const Event FilesystemCacheEvictedFileSegments;
|
||||
extern const Event FilesystemCacheLockCacheMicroseconds;
|
||||
extern const Event FilesystemCacheReserveMicroseconds;
|
||||
extern const Event FilesystemCacheGetOrSetMicroseconds;
|
||||
extern const Event FilesystemCacheGetMicroseconds;
|
||||
}
|
||||
|
||||
namespace
|
||||
@ -129,6 +134,12 @@ void FileCache::initialize()
|
||||
cleanup_task->scheduleAfter(delayed_cleanup_interval_ms);
|
||||
}
|
||||
|
||||
CacheGuard::Lock FileCache::lockCache() const
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FilesystemCacheLockCacheMicroseconds);
|
||||
return cache_guard.lock();
|
||||
}
|
||||
|
||||
FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment::Range & range) const
|
||||
{
|
||||
/// Given range = [left, right] and non-overlapping ordered set of file segments,
|
||||
@ -414,6 +425,8 @@ FileSegmentsHolderPtr FileCache::set(
|
||||
FileSegmentsHolderPtr
|
||||
FileCache::getOrSet(const Key & key, size_t offset, size_t size, size_t file_size, const CreateFileSegmentSettings & settings)
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FilesystemCacheGetOrSetMicroseconds);
|
||||
|
||||
assertInitialized();
|
||||
|
||||
const auto aligned_offset = roundDownToMultiple(offset, boundary_alignment);
|
||||
@ -448,6 +461,8 @@ FileCache::getOrSet(const Key & key, size_t offset, size_t size, size_t file_siz
|
||||
|
||||
FileSegmentsHolderPtr FileCache::get(const Key & key, size_t offset, size_t size)
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FilesystemCacheGetMicroseconds);
|
||||
|
||||
assertInitialized();
|
||||
|
||||
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::RETURN_NULL);
|
||||
@ -562,8 +577,10 @@ KeyMetadata::iterator FileCache::addFileSegment(
|
||||
|
||||
bool FileCache::tryReserve(FileSegment & file_segment, const size_t size)
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FilesystemCacheReserveMicroseconds);
|
||||
|
||||
assertInitialized();
|
||||
auto cache_lock = cache_guard.lock();
|
||||
auto cache_lock = lockCache();
|
||||
|
||||
LOG_TEST(
|
||||
log, "Trying to reserve space ({} bytes) for {}:{}, current usage {}/{}",
|
||||
@ -799,7 +816,7 @@ void FileCache::removeAllReleasable()
|
||||
/// `remove_persistent_files` defines whether non-evictable by some criteria files
|
||||
/// (they do not comply with the cache eviction policy) should also be removed.
|
||||
|
||||
auto lock = cache_guard.lock();
|
||||
auto lock = lockCache();
|
||||
|
||||
main_priority->iterate([&](LockedKey & locked_key, FileSegmentMetadataPtr segment_metadata)
|
||||
{
|
||||
@ -822,7 +839,7 @@ void FileCache::removeAllReleasable()
|
||||
|
||||
void FileCache::loadMetadata()
|
||||
{
|
||||
auto lock = cache_guard.lock();
|
||||
auto lock = lockCache();
|
||||
|
||||
UInt64 offset = 0;
|
||||
size_t size = 0;
|
||||
@ -1039,7 +1056,7 @@ FileSegmentsHolderPtr FileCache::dumpQueue()
|
||||
{
|
||||
file_segments.push_back(FileSegment::getSnapshot(segment_metadata->file_segment));
|
||||
return PriorityIterationResult::CONTINUE;
|
||||
}, cache_guard.lock());
|
||||
}, lockCache());
|
||||
|
||||
return std::make_unique<FileSegmentsHolder>(std::move(file_segments));
|
||||
}
|
||||
@ -1064,17 +1081,17 @@ std::vector<String> FileCache::tryGetCachePaths(const Key & key)
|
||||
|
||||
size_t FileCache::getUsedCacheSize() const
|
||||
{
|
||||
return main_priority->getSize(cache_guard.lock());
|
||||
return main_priority->getSize(lockCache());
|
||||
}
|
||||
|
||||
size_t FileCache::getFileSegmentsNum() const
|
||||
{
|
||||
return main_priority->getElementsCount(cache_guard.lock());
|
||||
return main_priority->getElementsCount(lockCache());
|
||||
}
|
||||
|
||||
void FileCache::assertCacheCorrectness()
|
||||
{
|
||||
auto lock = cache_guard.lock();
|
||||
auto lock = lockCache();
|
||||
main_priority->iterate([&](LockedKey &, FileSegmentMetadataPtr segment_metadata)
|
||||
{
|
||||
const auto & file_segment = *segment_metadata->file_segment;
|
||||
@ -1100,7 +1117,7 @@ FileCache::QueryContextHolder::~QueryContextHolder()
|
||||
/// the query has been completed and the query_context is released.
|
||||
if (context && context.use_count() == 2)
|
||||
{
|
||||
auto lock = cache->cache_guard.lock();
|
||||
auto lock = cache->lockCache();
|
||||
cache->query_limit->removeQueryContext(query_id, lock);
|
||||
}
|
||||
}
|
||||
@ -1111,7 +1128,7 @@ FileCache::QueryContextHolderPtr FileCache::getQueryContextHolder(
|
||||
if (!query_limit || settings.filesystem_cache_max_download_size == 0)
|
||||
return {};
|
||||
|
||||
auto lock = cache_guard.lock();
|
||||
auto lock = lockCache();
|
||||
auto context = query_limit->getOrSetQueryContext(query_id, settings, lock);
|
||||
return std::make_unique<QueryContextHolder>(query_id, this, std::move(context));
|
||||
}
|
||||
|
@ -125,7 +125,7 @@ public:
|
||||
using QueryContextHolderPtr = std::unique_ptr<QueryContextHolder>;
|
||||
QueryContextHolderPtr getQueryContextHolder(const String & query_id, const ReadSettings & settings);
|
||||
|
||||
CacheGuard::Lock lockCache() { return cache_guard.lock(); }
|
||||
CacheGuard::Lock lockCache() const;
|
||||
|
||||
private:
|
||||
using KeyAndOffset = FileCacheKeyAndOffset;
|
||||
|
@ -9,11 +9,22 @@
|
||||
#include <Common/OpenTelemetryTraceContext.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/scope_guard_safe.h>
|
||||
#include <Common/ElapsedTimeProfileEventIncrement.h>
|
||||
|
||||
#include <magic_enum.hpp>
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event FileSegmentWaitMicroseconds;
|
||||
extern const Event FileSegmentCompleteMicroseconds;
|
||||
extern const Event FileSegmentLockMicroseconds;
|
||||
extern const Event FileSegmentWriteMicroseconds;
|
||||
extern const Event FileSegmentUseMicroseconds;
|
||||
extern const Event FileSegmentHolderCompleteMicroseconds;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -83,9 +94,15 @@ FileSegment::FileSegment(
|
||||
}
|
||||
}
|
||||
|
||||
FileSegment::Range::Range(size_t left_, size_t right_) : left(left_), right(right_)
|
||||
{
|
||||
if (left > right)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to create incorrect range: [{}, {}]", left, right);
|
||||
}
|
||||
|
||||
FileSegment::State FileSegment::state() const
|
||||
{
|
||||
auto lock = segment_guard.lock();
|
||||
auto lock = lockFileSegment();
|
||||
return download_state;
|
||||
}
|
||||
|
||||
@ -94,6 +111,12 @@ String FileSegment::getPathInLocalCache() const
|
||||
return getKeyMetadata()->getFileSegmentPath(*this);
|
||||
}
|
||||
|
||||
FileSegmentGuard::Lock FileSegment::lockFileSegment() const
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FileSegmentLockMicroseconds);
|
||||
return segment_guard.lock();
|
||||
}
|
||||
|
||||
void FileSegment::setDownloadState(State state, const FileSegmentGuard::Lock & lock)
|
||||
{
|
||||
if (isCompleted(false) && state != State::DETACHED)
|
||||
@ -110,19 +133,19 @@ void FileSegment::setDownloadState(State state, const FileSegmentGuard::Lock & l
|
||||
|
||||
size_t FileSegment::getReservedSize() const
|
||||
{
|
||||
auto lock = segment_guard.lock();
|
||||
auto lock = lockFileSegment();
|
||||
return reserved_size;
|
||||
}
|
||||
|
||||
FileSegment::Priority::Iterator FileSegment::getQueueIterator() const
|
||||
{
|
||||
auto lock = segment_guard.lock();
|
||||
auto lock = lockFileSegment();
|
||||
return queue_iterator;
|
||||
}
|
||||
|
||||
void FileSegment::setQueueIterator(Priority::Iterator iterator)
|
||||
{
|
||||
auto lock = segment_guard.lock();
|
||||
auto lock = lockFileSegment();
|
||||
if (queue_iterator)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Queue iterator cannot be set twice");
|
||||
queue_iterator = iterator;
|
||||
@ -150,14 +173,14 @@ size_t FileSegment::getDownloadedSize(bool sync) const
|
||||
|
||||
void FileSegment::setDownloadedSize(size_t delta)
|
||||
{
|
||||
auto lock = segment_guard.lock();
|
||||
auto lock = lockFileSegment();
|
||||
downloaded_size += delta;
|
||||
assert(downloaded_size == std::filesystem::file_size(getPathInLocalCache()));
|
||||
}
|
||||
|
||||
bool FileSegment::isDownloaded() const
|
||||
{
|
||||
auto lock = segment_guard.lock();
|
||||
auto lock = lockFileSegment();
|
||||
return download_state == State::DOWNLOADED;
|
||||
}
|
||||
|
||||
@ -173,7 +196,7 @@ String FileSegment::getCallerId()
|
||||
|
||||
String FileSegment::getDownloader() const
|
||||
{
|
||||
auto lock = segment_guard.lock();
|
||||
auto lock = lockFileSegment();
|
||||
return getDownloaderUnlocked(lock);
|
||||
}
|
||||
|
||||
@ -184,7 +207,7 @@ String FileSegment::getDownloaderUnlocked(const FileSegmentGuard::Lock &) const
|
||||
|
||||
String FileSegment::getOrSetDownloader()
|
||||
{
|
||||
auto lock = segment_guard.lock();
|
||||
auto lock = lockFileSegment();
|
||||
|
||||
assertNotDetachedUnlocked(lock);
|
||||
|
||||
@ -220,7 +243,7 @@ void FileSegment::resetDownloadingStateUnlocked(const FileSegmentGuard::Lock & l
|
||||
|
||||
void FileSegment::resetDownloader()
|
||||
{
|
||||
auto lock = segment_guard.lock();
|
||||
auto lock = lockFileSegment();
|
||||
|
||||
SCOPE_EXIT({ cv.notify_all(); });
|
||||
|
||||
@ -255,7 +278,7 @@ void FileSegment::assertIsDownloaderUnlocked(const std::string & operation, cons
|
||||
|
||||
bool FileSegment::isDownloader() const
|
||||
{
|
||||
auto lock = segment_guard.lock();
|
||||
auto lock = lockFileSegment();
|
||||
return isDownloaderUnlocked(lock);
|
||||
}
|
||||
|
||||
@ -266,14 +289,14 @@ bool FileSegment::isDownloaderUnlocked(const FileSegmentGuard::Lock & lock) cons
|
||||
|
||||
FileSegment::RemoteFileReaderPtr FileSegment::getRemoteFileReader()
|
||||
{
|
||||
auto lock = segment_guard.lock();
|
||||
auto lock = lockFileSegment();
|
||||
assertIsDownloaderUnlocked("getRemoteFileReader", lock);
|
||||
return remote_file_reader;
|
||||
}
|
||||
|
||||
void FileSegment::resetRemoteFileReader()
|
||||
{
|
||||
auto lock = segment_guard.lock();
|
||||
auto lock = lockFileSegment();
|
||||
assertIsDownloaderUnlocked("resetRemoteFileReader", lock);
|
||||
remote_file_reader.reset();
|
||||
}
|
||||
@ -287,7 +310,7 @@ FileSegment::RemoteFileReaderPtr FileSegment::extractRemoteFileReader()
|
||||
return std::move(remote_file_reader);
|
||||
}
|
||||
|
||||
auto segment_lock = segment_guard.lock();
|
||||
auto segment_lock = lockFileSegment();
|
||||
|
||||
assert(download_state != State::DETACHED);
|
||||
|
||||
@ -300,7 +323,7 @@ FileSegment::RemoteFileReaderPtr FileSegment::extractRemoteFileReader()
|
||||
|
||||
void FileSegment::setRemoteFileReader(RemoteFileReaderPtr remote_file_reader_)
|
||||
{
|
||||
auto lock = segment_guard.lock();
|
||||
auto lock = lockFileSegment();
|
||||
assertIsDownloaderUnlocked("setRemoteFileReader", lock);
|
||||
|
||||
if (remote_file_reader)
|
||||
@ -311,13 +334,15 @@ void FileSegment::setRemoteFileReader(RemoteFileReaderPtr remote_file_reader_)
|
||||
|
||||
void FileSegment::write(const char * from, size_t size, size_t offset)
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FileSegmentWriteMicroseconds);
|
||||
|
||||
if (!size)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing zero size is not allowed");
|
||||
|
||||
const auto file_segment_path = getPathInLocalCache();
|
||||
|
||||
{
|
||||
auto lock = segment_guard.lock();
|
||||
auto lock = lockFileSegment();
|
||||
|
||||
assertIsDownloaderUnlocked("write", lock);
|
||||
assertNotDetachedUnlocked(lock);
|
||||
@ -372,7 +397,7 @@ void FileSegment::write(const char * from, size_t size, size_t offset)
|
||||
}
|
||||
catch (ErrnoException & e)
|
||||
{
|
||||
auto lock = segment_guard.lock();
|
||||
auto lock = lockFileSegment();
|
||||
e.addMessage(fmt::format("{}, current cache state: {}", e.what(), getInfoForLogUnlocked(lock)));
|
||||
|
||||
int code = e.getErrno();
|
||||
@ -392,7 +417,7 @@ void FileSegment::write(const char * from, size_t size, size_t offset)
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
auto lock = segment_guard.lock();
|
||||
auto lock = lockFileSegment();
|
||||
e.addMessage(fmt::format("{}, current cache state: {}", e.what(), getInfoForLogUnlocked(lock)));
|
||||
setDownloadFailedUnlocked(lock);
|
||||
throw;
|
||||
@ -405,7 +430,7 @@ FileSegment::State FileSegment::wait(size_t offset)
|
||||
{
|
||||
OpenTelemetry::SpanHolder span{fmt::format("FileSegment::wait({})", key().toString())};
|
||||
|
||||
auto lock = segment_guard.lock();
|
||||
auto lock = lockFileSegment();
|
||||
|
||||
if (downloader_id.empty() || offset < getCurrentWriteOffset(true))
|
||||
return download_state;
|
||||
@ -416,13 +441,14 @@ FileSegment::State FileSegment::wait(size_t offset)
|
||||
if (download_state == State::DOWNLOADING)
|
||||
{
|
||||
LOG_TEST(log, "{} waiting on: {}, current downloader: {}", getCallerId(), range().toString(), downloader_id);
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FileSegmentWaitMicroseconds);
|
||||
|
||||
chassert(!getDownloaderUnlocked(lock).empty());
|
||||
chassert(!isDownloaderUnlocked(lock));
|
||||
|
||||
[[maybe_unused]] const auto ok = cv.wait_for(lock, std::chrono::seconds(60), [&, this]()
|
||||
{
|
||||
return download_state != State::DOWNLOADING || offset < getCurrentWriteOffset(true);
|
||||
return download_state != State::DOWNLOADING || offset < getCurrentWriteOffset(false);
|
||||
});
|
||||
/// chassert(ok);
|
||||
}
|
||||
@ -507,7 +533,7 @@ bool FileSegment::reserve(size_t size_to_reserve)
|
||||
reserved = cache->tryReserve(*this, size_to_reserve);
|
||||
|
||||
if (!reserved)
|
||||
setDownloadFailedUnlocked(segment_guard.lock());
|
||||
setDownloadFailedUnlocked(lockFileSegment());
|
||||
|
||||
return reserved;
|
||||
}
|
||||
@ -549,7 +575,7 @@ void FileSegment::setDownloadFailedUnlocked(const FileSegmentGuard::Lock & lock)
|
||||
|
||||
void FileSegment::completePartAndResetDownloader()
|
||||
{
|
||||
auto lock = segment_guard.lock();
|
||||
auto lock = lockFileSegment();
|
||||
|
||||
SCOPE_EXIT({ cv.notify_all(); });
|
||||
|
||||
@ -569,6 +595,8 @@ void FileSegment::completePartAndResetDownloader()
|
||||
|
||||
void FileSegment::complete()
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FileSegmentCompleteMicroseconds);
|
||||
|
||||
if (isCompleted())
|
||||
return;
|
||||
|
||||
@ -582,7 +610,7 @@ void FileSegment::complete()
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot complete file segment: {}", getInfoForLog());
|
||||
}
|
||||
|
||||
auto segment_lock = segment_guard.lock();
|
||||
auto segment_lock = lockFileSegment();
|
||||
|
||||
if (isCompleted(false))
|
||||
return;
|
||||
@ -687,7 +715,7 @@ void FileSegment::complete()
|
||||
|
||||
String FileSegment::getInfoForLog() const
|
||||
{
|
||||
auto lock = segment_guard.lock();
|
||||
auto lock = lockFileSegment();
|
||||
return getInfoForLogUnlocked(lock);
|
||||
}
|
||||
|
||||
@ -731,7 +759,7 @@ String FileSegment::stateToString(FileSegment::State state)
|
||||
|
||||
bool FileSegment::assertCorrectness() const
|
||||
{
|
||||
return assertCorrectnessUnlocked(segment_guard.lock());
|
||||
return assertCorrectnessUnlocked(lockFileSegment());
|
||||
}
|
||||
|
||||
bool FileSegment::assertCorrectnessUnlocked(const FileSegmentGuard::Lock &) const
|
||||
@ -779,7 +807,7 @@ bool FileSegment::assertCorrectnessUnlocked(const FileSegmentGuard::Lock &) cons
|
||||
|
||||
void FileSegment::assertNotDetached() const
|
||||
{
|
||||
auto lock = segment_guard.lock();
|
||||
auto lock = lockFileSegment();
|
||||
assertNotDetachedUnlocked(lock);
|
||||
}
|
||||
|
||||
@ -797,7 +825,7 @@ void FileSegment::assertNotDetachedUnlocked(const FileSegmentGuard::Lock & lock)
|
||||
|
||||
FileSegmentPtr FileSegment::getSnapshot(const FileSegmentPtr & file_segment)
|
||||
{
|
||||
auto lock = file_segment->segment_guard.lock();
|
||||
auto lock = file_segment->lockFileSegment();
|
||||
|
||||
auto snapshot = std::make_shared<FileSegment>(
|
||||
file_segment->key(),
|
||||
@ -816,7 +844,7 @@ FileSegmentPtr FileSegment::getSnapshot(const FileSegmentPtr & file_segment)
|
||||
|
||||
bool FileSegment::isDetached() const
|
||||
{
|
||||
auto lock = segment_guard.lock();
|
||||
auto lock = lockFileSegment();
|
||||
return download_state == State::DETACHED;
|
||||
}
|
||||
|
||||
@ -832,7 +860,7 @@ bool FileSegment::isCompleted(bool sync) const
|
||||
if (is_completed_state())
|
||||
return true;
|
||||
|
||||
auto lock = segment_guard.lock();
|
||||
auto lock = lockFileSegment();
|
||||
return is_completed_state();
|
||||
}
|
||||
|
||||
@ -858,6 +886,8 @@ void FileSegment::detach(const FileSegmentGuard::Lock & lock, const LockedKey &)
|
||||
|
||||
void FileSegment::use()
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FileSegmentUseMicroseconds);
|
||||
|
||||
if (!cache)
|
||||
{
|
||||
chassert(isCompleted(true));
|
||||
@ -880,6 +910,8 @@ FileSegments::iterator FileSegmentsHolder::completeAndPopFrontImpl()
|
||||
|
||||
FileSegmentsHolder::~FileSegmentsHolder()
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FileSegmentHolderCompleteMicroseconds);
|
||||
|
||||
if (!complete_on_dtor)
|
||||
return;
|
||||
|
||||
|
@ -130,7 +130,7 @@ public:
|
||||
size_t left;
|
||||
size_t right;
|
||||
|
||||
Range(size_t left_, size_t right_) : left(left_), right(right_) {}
|
||||
Range(size_t left_, size_t right_);
|
||||
|
||||
bool operator==(const Range & other) const { return left == other.left && right == other.right; }
|
||||
|
||||
@ -293,6 +293,7 @@ private:
|
||||
bool assertCorrectnessUnlocked(const FileSegmentGuard::Lock &) const;
|
||||
|
||||
LockedKeyPtr lockKeyMetadata(bool assert_exists = true) const;
|
||||
FileSegmentGuard::Lock lockFileSegment() const;
|
||||
|
||||
Key file_key;
|
||||
Range segment_range;
|
||||
|
@ -2,10 +2,17 @@
|
||||
#include <Interpreters/Cache/FileCache.h>
|
||||
#include <Interpreters/Cache/FileSegment.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/ElapsedTimeProfileEventIncrement.h>
|
||||
#include <filesystem>
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event FilesystemCacheLockKeyMicroseconds;
|
||||
extern const Event FilesystemCacheLockMetadataMicroseconds;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -69,6 +76,8 @@ LockedKeyPtr KeyMetadata::lock()
|
||||
|
||||
LockedKeyPtr KeyMetadata::tryLock()
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FilesystemCacheLockKeyMicroseconds);
|
||||
|
||||
auto locked = std::make_unique<LockedKey>(shared_from_this());
|
||||
if (key_state == KeyMetadata::KeyState::ACTIVE)
|
||||
return locked;
|
||||
@ -156,6 +165,12 @@ String CacheMetadata::getPathForKey(const Key & key) const
|
||||
return fs::path(path) / key_str.substr(0, 3) / key_str;
|
||||
}
|
||||
|
||||
CacheMetadataGuard::Lock CacheMetadata::lockMetadata() const
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FilesystemCacheLockMetadataMicroseconds);
|
||||
return guard.lock();
|
||||
}
|
||||
|
||||
LockedKeyPtr CacheMetadata::lockKeyMetadata(
|
||||
const FileCacheKey & key,
|
||||
KeyNotFoundPolicy key_not_found_policy,
|
||||
@ -163,7 +178,7 @@ LockedKeyPtr CacheMetadata::lockKeyMetadata(
|
||||
{
|
||||
KeyMetadataPtr key_metadata;
|
||||
{
|
||||
auto lock = guard.lock();
|
||||
auto lock = lockMetadata();
|
||||
|
||||
auto it = find(key);
|
||||
if (it == end())
|
||||
@ -182,9 +197,13 @@ LockedKeyPtr CacheMetadata::lockKeyMetadata(
|
||||
}
|
||||
|
||||
{
|
||||
auto locked_metadata = std::make_unique<LockedKey>(key_metadata);
|
||||
const auto key_state = locked_metadata->getKeyState();
|
||||
LockedKeyPtr locked_metadata;
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FilesystemCacheLockKeyMicroseconds);
|
||||
locked_metadata = std::make_unique<LockedKey>(key_metadata);
|
||||
}
|
||||
|
||||
const auto key_state = locked_metadata->getKeyState();
|
||||
if (key_state == KeyMetadata::KeyState::ACTIVE)
|
||||
return locked_metadata;
|
||||
|
||||
@ -213,10 +232,15 @@ LockedKeyPtr CacheMetadata::lockKeyMetadata(
|
||||
|
||||
void CacheMetadata::iterate(IterateCacheMetadataFunc && func)
|
||||
{
|
||||
auto lock = guard.lock();
|
||||
auto lock = lockMetadata();
|
||||
for (const auto & [key, key_metadata] : *this)
|
||||
{
|
||||
auto locked_key = std::make_unique<LockedKey>(key_metadata);
|
||||
LockedKeyPtr locked_key;
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FilesystemCacheLockKeyMicroseconds);
|
||||
locked_key = std::make_unique<LockedKey>(key_metadata);
|
||||
}
|
||||
|
||||
const auto key_state = locked_key->getKeyState();
|
||||
|
||||
if (key_state == KeyMetadata::KeyState::ACTIVE)
|
||||
@ -235,7 +259,7 @@ void CacheMetadata::iterate(IterateCacheMetadataFunc && func)
|
||||
|
||||
void CacheMetadata::doCleanup()
|
||||
{
|
||||
auto lock = guard.lock();
|
||||
auto lock = lockMetadata();
|
||||
|
||||
FileCacheKey cleanup_key;
|
||||
while (cleanup_queue->tryPop(cleanup_key))
|
||||
@ -244,9 +268,13 @@ void CacheMetadata::doCleanup()
|
||||
if (it == end())
|
||||
continue;
|
||||
|
||||
auto locked_metadata = std::make_unique<LockedKey>(it->second);
|
||||
const auto key_state = locked_metadata->getKeyState();
|
||||
LockedKeyPtr locked_metadata;
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FilesystemCacheLockKeyMicroseconds);
|
||||
locked_metadata = std::make_unique<LockedKey>(it->second);
|
||||
}
|
||||
|
||||
const auto key_state = locked_metadata->getKeyState();
|
||||
if (key_state == KeyMetadata::KeyState::ACTIVE)
|
||||
{
|
||||
/// Key was added back to cache after we submitted it to removal queue.
|
||||
|
@ -110,8 +110,9 @@ public:
|
||||
void doCleanup();
|
||||
|
||||
private:
|
||||
CacheMetadataGuard::Lock lockMetadata() const;
|
||||
const std::string path; /// Cache base path
|
||||
CacheMetadataGuard guard;
|
||||
mutable CacheMetadataGuard guard;
|
||||
const CleanupQueuePtr cleanup_queue;
|
||||
Poco::Logger * log;
|
||||
};
|
||||
|
Loading…
Reference in New Issue
Block a user