Fix style check, better priority->iterate

This commit is contained in:
kssenii 2023-04-13 14:44:06 +02:00
parent 58a30213c9
commit ce723ec32d
10 changed files with 69 additions and 66 deletions

View File

@ -632,7 +632,7 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegment & file_segment)
bytes_to_predownload = 0; bytes_to_predownload = 0;
file_segment.setBroken(); file_segment.setBroken();
chassert(file_segment->state() == FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION); chassert(file_segment.state() == FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
LOG_TEST(log, "Bypassing cache because for {}", file_segment.getInfoForLog()); LOG_TEST(log, "Bypassing cache because for {}", file_segment.getInfoForLog());

View File

@ -315,9 +315,9 @@ void FileCache::fillHolesWithEmptyFileSegments(
} }
else else
{ {
auto splitted = splitRangeIntoFileSegments( auto split = splitRangeIntoFileSegments(
locked_key, current_pos, hole_size, FileSegment::State::EMPTY, settings); locked_key, current_pos, hole_size, FileSegment::State::EMPTY, settings);
file_segments.splice(it, std::move(splitted)); file_segments.splice(it, std::move(split));
} }
current_pos = segment_range.right + 1; current_pos = segment_range.right + 1;
@ -342,9 +342,9 @@ void FileCache::fillHolesWithEmptyFileSegments(
} }
else else
{ {
auto splitted = splitRangeIntoFileSegments( auto split = splitRangeIntoFileSegments(
locked_key, current_pos, hole_size, FileSegment::State::EMPTY, settings); locked_key, current_pos, hole_size, FileSegment::State::EMPTY, settings);
file_segments.splice(file_segments.end(), std::move(splitted)); file_segments.splice(file_segments.end(), std::move(split));
} }
} }
} }
@ -556,29 +556,25 @@ bool FileCache::tryReserve(FileSegment & file_segment, size_t size)
size_t removed_size = 0; size_t removed_size = 0;
std::unordered_map<Key, EvictionCandidates> to_delete; std::unordered_map<Key, EvictionCandidates> to_delete;
auto iterate_func = [&](const PriorityEntry & entry, LockedKey & locked_key) auto iterate_func = [&](LockedKey & locked_key, FileSegmentMetadataPtr segment_metadata)
{ {
auto file_segment_metadata = locked_key.tryGetByOffset(entry.offset); chassert(segment_metadata->file_segment->getQueueIterator());
if (!file_segment_metadata) const bool is_persistent = allow_persistent_files && segment_metadata->file_segment->isPersistent();
return PriorityIterationResult::REMOVE_AND_CONTINUE; const bool releasable = segment_metadata->releasable() && !is_persistent;
chassert(file_segment_metadata->file_segment->getQueueIterator());
const bool is_persistent = allow_persistent_files && file_segment_metadata->file_segment->isPersistent();
const bool releasable = file_segment_metadata->releasable() && !is_persistent;
if (releasable) if (releasable)
{ {
removed_size += entry.size; removed_size += segment_metadata->size();
--queue_size; --queue_size;
auto segment = file_segment_metadata->file_segment; auto segment = segment_metadata->file_segment;
if (segment->state() == FileSegment::State::DOWNLOADED) if (segment->state() == FileSegment::State::DOWNLOADED)
{ {
const auto & key = segment->key(); const auto & key = segment->key();
auto it = to_delete.find(key); auto it = to_delete.find(key);
if (it == to_delete.end()) if (it == to_delete.end())
it = to_delete.emplace(key, locked_key.getKeyMetadata()).first; it = to_delete.emplace(key, locked_key.getKeyMetadata()).first;
it->second.add(file_segment_metadata); it->second.add(segment_metadata);
return PriorityIterationResult::CONTINUE; return PriorityIterationResult::CONTINUE;
} }
@ -598,8 +594,8 @@ bool FileCache::tryReserve(FileSegment & file_segment, size_t size)
}; };
query_priority->iterate( query_priority->iterate(
[&](const auto & entry, LockedKey & locked_key) [&](LockedKey & locked_key, FileSegmentMetadataPtr segment_metadata)
{ return is_query_priority_overflow() ? iterate_func(entry, locked_key) : PriorityIterationResult::BREAK; }, { return is_query_priority_overflow() ? iterate_func(locked_key, segment_metadata) : PriorityIterationResult::BREAK; },
cache_lock); cache_lock);
if (is_query_priority_overflow()) if (is_query_priority_overflow())
@ -615,8 +611,8 @@ bool FileCache::tryReserve(FileSegment & file_segment, size_t size)
}; };
main_priority->iterate( main_priority->iterate(
[&](const auto & entry, LockedKey & locked_key) [&](LockedKey & locked_key, FileSegmentMetadataPtr segment_metadata)
{ return is_main_priority_overflow() ? iterate_func(entry, locked_key) : PriorityIterationResult::BREAK; }, { return is_main_priority_overflow() ? iterate_func(locked_key, segment_metadata) : PriorityIterationResult::BREAK; },
cache_lock); cache_lock);
if (is_main_priority_overflow()) if (is_main_priority_overflow())
@ -696,15 +692,11 @@ void FileCache::removeAllReleasable()
auto lock = cache_guard.lock(); auto lock = cache_guard.lock();
main_priority->iterate([&](const PriorityEntry & entry, LockedKey & locked_key) main_priority->iterate([&](LockedKey & locked_key, FileSegmentMetadataPtr segment_metadata)
{ {
auto file_segment_metadata = locked_key.tryGetByOffset(entry.offset); if (segment_metadata->releasable())
if (!file_segment_metadata)
return PriorityIterationResult::REMOVE_AND_CONTINUE;
if (file_segment_metadata->releasable())
{ {
auto file_segment = file_segment_metadata->file_segment; auto file_segment = segment_metadata->file_segment;
locked_key.removeFileSegment(file_segment->offset(), file_segment->lock()); locked_key.removeFileSegment(file_segment->offset(), file_segment->lock());
return PriorityIterationResult::REMOVE_AND_CONTINUE; return PriorityIterationResult::REMOVE_AND_CONTINUE;
} }
@ -933,13 +925,9 @@ FileSegmentsHolderPtr FileCache::dumpQueue()
assertInitialized(); assertInitialized();
FileSegments file_segments; FileSegments file_segments;
main_priority->iterate([&](const PriorityEntry & entry, LockedKey & locked_key) main_priority->iterate([&](LockedKey &, FileSegmentMetadataPtr segment_metadata)
{ {
auto file_segment_metadata = locked_key.tryGetByOffset(entry.offset); file_segments.push_back(FileSegment::getSnapshot(segment_metadata->file_segment));
if (!file_segment_metadata)
return PriorityIterationResult::REMOVE_AND_CONTINUE;
file_segments.push_back(FileSegment::getSnapshot(file_segment_metadata->file_segment));
return PriorityIterationResult::CONTINUE; return PriorityIterationResult::CONTINUE;
}, cache_guard.lock()); }, cache_guard.lock());

View File

@ -18,15 +18,13 @@
#include <Interpreters/Cache/FileSegment.h> #include <Interpreters/Cache/FileSegment.h>
#include <Interpreters/Cache/Metadata.h> #include <Interpreters/Cache/Metadata.h>
#include <Interpreters/Cache/QueryLimit.h> #include <Interpreters/Cache/QueryLimit.h>
#include <Interpreters/Cache/FileCache_fwd_internal.h>
#include <filesystem> #include <filesystem>
namespace DB namespace DB
{ {
struct LockedKey;
using LockedKeyPtr = std::shared_ptr<LockedKey>;
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int BAD_ARGUMENTS; extern const int BAD_ARGUMENTS;

View File

@ -0,0 +1,25 @@
#include <list>
namespace DB
{
class FileCache;
using FileCachePtr = std::shared_ptr<FileCache>;
class IFileCachePriority;
using FileCachePriorityPtr = std::unique_ptr<IFileCachePriority>;
class FileSegment;
using FileSegmentPtr = std::shared_ptr<FileSegment>;
using FileSegments = std::list<FileSegmentPtr>;
struct FileSegmentMetadata;
using FileSegmentMetadataPtr = std::shared_ptr<FileSegmentMetadata>;
struct LockedKey;
using LockedKeyPtr = std::shared_ptr<LockedKey>;
struct KeyMetadata;
using KeyMetadataPtr = std::shared_ptr<KeyMetadata>;
}

View File

@ -380,10 +380,11 @@ FileSegment::State FileSegment::wait(size_t offset)
chassert(!getDownloaderUnlocked(lock).empty()); chassert(!getDownloaderUnlocked(lock).empty());
chassert(!isDownloaderUnlocked(lock)); chassert(!isDownloaderUnlocked(lock));
cv.wait_for(lock, std::chrono::seconds(60), [&, this]() [[maybe_unused]] auto ok = cv.wait_for(lock, std::chrono::seconds(60), [&, this]()
{ {
return download_state != State::DOWNLOADING || offset < getCurrentWriteOffset(true); return download_state != State::DOWNLOADING || offset < getCurrentWriteOffset(true);
}); });
chassert(ok);
} }
return download_state; return download_state;

View File

@ -11,7 +11,7 @@
#include <IO/OpenedFileCache.h> #include <IO/OpenedFileCache.h>
#include <base/getThreadId.h> #include <base/getThreadId.h>
#include <Interpreters/Cache/IFileCachePriority.h> #include <Interpreters/Cache/IFileCachePriority.h>
#include <list> #include <Interpreters/Cache/FileCache_fwd_internal.h>
#include <queue> #include <queue>
@ -25,18 +25,8 @@ extern const Metric CacheFileSegments;
namespace DB namespace DB
{ {
class FileCache;
class ReadBufferFromFileBase; class ReadBufferFromFileBase;
class FileSegment;
using FileSegmentPtr = std::shared_ptr<FileSegment>;
using FileSegments = std::list<FileSegmentPtr>;
struct FileSegmentMetadata;
struct LockedKey;
using LockedKeyPtr = std::shared_ptr<LockedKey>;
struct KeyMetadata;
using KeyMetadataPtr = std::shared_ptr<KeyMetadata>;
/* /*
* FileSegmentKind is used to specify the eviction policy for file segments. * FileSegmentKind is used to specify the eviction policy for file segments.
*/ */

View File

@ -6,21 +6,11 @@
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Interpreters/Cache/FileCacheKey.h> #include <Interpreters/Cache/FileCacheKey.h>
#include <Interpreters/Cache/Guards.h> #include <Interpreters/Cache/Guards.h>
#include <Interpreters/Cache/FileCache_fwd_internal.h>
namespace DB namespace DB
{ {
class IFileCachePriority;
using FileCachePriorityPtr = std::unique_ptr<IFileCachePriority>;
struct KeyMetadata;
using KeyMetadataPtr = std::shared_ptr<KeyMetadata>;
struct LockedKey;
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
/// IFileCachePriority is used to maintain the priority of cached data. /// IFileCachePriority is used to maintain the priority of cached data.
class IFileCachePriority : private boost::noncopyable class IFileCachePriority : private boost::noncopyable
{ {
@ -71,7 +61,7 @@ public:
CONTINUE, CONTINUE,
REMOVE_AND_CONTINUE, REMOVE_AND_CONTINUE,
}; };
using IterateFunc = std::function<IterationResult(const Entry &, LockedKey &)>; using IterateFunc = std::function<IterationResult(LockedKey &, FileSegmentMetadataPtr)>;
IFileCachePriority(size_t max_size_, size_t max_elements_) : max_size(max_size_), max_elements(max_elements_) {} IFileCachePriority(size_t max_size_, size_t max_elements_) : max_size(max_size_), max_elements(max_elements_) {}

View File

@ -102,7 +102,22 @@ void LRUFileCachePriority::iterate(IterateFunc && func, const CacheGuard::Lock &
continue; continue;
} }
auto result = func(*it, *locked_key); auto metadata = locked_key->tryGetByOffset(it->offset);
if (!metadata)
{
it = remove(it);
continue;
}
if (metadata->size() != it->size)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Mismatch of file segment size in file segment metadata and priority queue: {} != {}",
it->size, metadata->size());
}
auto result = func(*locked_key, metadata);
switch (result) switch (result)
{ {
case IterationResult::BREAK: case IterationResult::BREAK:

View File

@ -4,13 +4,10 @@
#include <Interpreters/Cache/IFileCachePriority.h> #include <Interpreters/Cache/IFileCachePriority.h>
#include <Interpreters/Cache/FileCacheKey.h> #include <Interpreters/Cache/FileCacheKey.h>
#include <Interpreters/Cache/FileSegment.h> #include <Interpreters/Cache/FileSegment.h>
#include <Interpreters/Cache/FileCache_fwd_internal.h>
namespace DB namespace DB
{ {
class FileSegment;
using FileSegmentPtr = std::shared_ptr<FileSegment>;
struct LockedKey;
using LockedKeyPtr = std::shared_ptr<LockedKey>;
struct CleanupQueue; struct CleanupQueue;
using CleanupQueuePtr = std::shared_ptr<CleanupQueue>; using CleanupQueuePtr = std::shared_ptr<CleanupQueue>;

View File

@ -19,7 +19,6 @@
#include <Coordination/KeeperDispatcher.h> #include <Coordination/KeeperDispatcher.h>
#include <Compression/ICompressionCodec.h> #include <Compression/ICompressionCodec.h>
#include <Core/BackgroundSchedulePool.h> #include <Core/BackgroundSchedulePool.h>
#include <Core/ServerSettings.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <Databases/IDatabase.h> #include <Databases/IDatabase.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>