From 20a2e786c99bf7ce700bce4498916477100273f7 Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 18 Aug 2022 14:21:20 +0200 Subject: [PATCH 1/8] Fix bug with multiple TTL merges on expired column --- .../MergeTree/IMergedBlockOutputStream.cpp | 13 +++++++--- .../MergeTree/MergedBlockOutputStream.cpp | 2 +- .../02403_ttl_column_multiple_times.reference | 3 +++ .../02403_ttl_column_multiple_times.sql | 24 +++++++++++++++++++ 4 files changed, 38 insertions(+), 4 deletions(-) create mode 100644 tests/queries/0_stateless/02403_ttl_column_multiple_times.reference create mode 100644 tests/queries/0_stateless/02403_ttl_column_multiple_times.sql diff --git a/src/Storages/MergeTree/IMergedBlockOutputStream.cpp b/src/Storages/MergeTree/IMergedBlockOutputStream.cpp index 31c6a635b18..7bfc6d12d1d 100644 --- a/src/Storages/MergeTree/IMergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/IMergedBlockOutputStream.cpp @@ -79,10 +79,17 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart( } /// Remove files on disk and checksums - for (const String & removed_file : remove_files) + for (auto itr = remove_files.begin(); itr != remove_files.end();) { - if (checksums.files.contains(removed_file)) - checksums.files.erase(removed_file); + if (checksums.files.contains(*itr)) + { + checksums.files.erase(*itr); + } + else /// If we have no file in checksums it doesn't exist on disk + { + LOG_TRACE(storage.log, "Files {} doesn't exist in checksums so it doesn't exist on disk, will not try to remove it", *itr); + itr = remove_files.erase(itr); + } } /// Remove columns from columns array diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/src/Storages/MergeTree/MergedBlockOutputStream.cpp index 06c5aac8ae3..a5bc189e42f 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -94,7 +94,7 @@ void MergedBlockOutputStream::Finalizer::Impl::finish() { writer.finish(sync); - for (const auto & file_name: files_to_remove_after_finish) + for (const auto & file_name : files_to_remove_after_finish) data_part_storage_builder->removeFile(file_name); for (auto & file : written_files) diff --git a/tests/queries/0_stateless/02403_ttl_column_multiple_times.reference b/tests/queries/0_stateless/02403_ttl_column_multiple_times.reference new file mode 100644 index 00000000000..5695a080619 --- /dev/null +++ b/tests/queries/0_stateless/02403_ttl_column_multiple_times.reference @@ -0,0 +1,3 @@ +2020-10-01 144 +2020-10-01 0 +2020-10-01 0 diff --git a/tests/queries/0_stateless/02403_ttl_column_multiple_times.sql b/tests/queries/0_stateless/02403_ttl_column_multiple_times.sql new file mode 100644 index 00000000000..3c1023a9988 --- /dev/null +++ b/tests/queries/0_stateless/02403_ttl_column_multiple_times.sql @@ -0,0 +1,24 @@ +DROP TABLE IF EXISTS ttl_table; + +CREATE TABLE ttl_table +( + EventDate Date, + Longitude Float64 TTL EventDate + toIntervalWeek(2) +) +ENGINE = MergeTree() +ORDER BY EventDate +SETTINGS vertical_merge_algorithm_min_rows_to_activate=1, vertical_merge_algorithm_min_columns_to_activate=1; + +INSERT INTO ttl_table VALUES(toDate('2020-10-01'), 144); + +SELECT * FROM ttl_table; + +OPTIMIZE TABLE ttl_table FINAL; + +SELECT * FROM ttl_table; + +OPTIMIZE TABLE ttl_table FINAL; + +SELECT * FROM ttl_table; + +DROP TABLE IF EXISTS ttl_table; From a2e08299f89560b21f9a3224cc0a36c87482100c Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 18 Aug 2022 15:36:55 +0200 Subject: [PATCH 2/8] Update src/Storages/MergeTree/IMergedBlockOutputStream.cpp Co-authored-by: Antonio Andelic --- src/Storages/MergeTree/IMergedBlockOutputStream.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Storages/MergeTree/IMergedBlockOutputStream.cpp b/src/Storages/MergeTree/IMergedBlockOutputStream.cpp index 7bfc6d12d1d..9be49a9bba4 100644 --- a/src/Storages/MergeTree/IMergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/IMergedBlockOutputStream.cpp @@ -84,6 +84,7 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart( if (checksums.files.contains(*itr)) { checksums.files.erase(*itr); + ++itr; } else /// If we have no file in checksums it doesn't exist on disk { From 9ec164fc070171adac412eaf3742b451ab307788 Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 19 Aug 2022 11:36:33 +0200 Subject: [PATCH 3/8] Update 02403_ttl_column_multiple_times.sql --- tests/queries/0_stateless/02403_ttl_column_multiple_times.sql | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/queries/0_stateless/02403_ttl_column_multiple_times.sql b/tests/queries/0_stateless/02403_ttl_column_multiple_times.sql index 3c1023a9988..a1114eb15b1 100644 --- a/tests/queries/0_stateless/02403_ttl_column_multiple_times.sql +++ b/tests/queries/0_stateless/02403_ttl_column_multiple_times.sql @@ -9,10 +9,14 @@ ENGINE = MergeTree() ORDER BY EventDate SETTINGS vertical_merge_algorithm_min_rows_to_activate=1, vertical_merge_algorithm_min_columns_to_activate=1; +SYSTEM STOP MERGES ttl_table; + INSERT INTO ttl_table VALUES(toDate('2020-10-01'), 144); SELECT * FROM ttl_table; +SYSTEM START MERGES ttl_table; + OPTIMIZE TABLE ttl_table FINAL; SELECT * FROM ttl_table; From 4de309718bc14bbce87d9331c10aa69e98dcc1fd Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 18 Aug 2022 17:36:04 +0200 Subject: [PATCH 4/8] tests/stress: use --privileged over --cap-add syslog to obtain dmesg Signed-off-by: Azat Khuzhin --- tests/ci/stress_check.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/ci/stress_check.py b/tests/ci/stress_check.py index 497df013cf4..e644eef3bc8 100644 --- a/tests/ci/stress_check.py +++ b/tests/ci/stress_check.py @@ -34,7 +34,7 @@ def get_run_command( # a static link, don't use S3_URL or S3_DOWNLOAD "-e S3_URL='https://s3.amazonaws.com/clickhouse-datasets' " # For dmesg - "--cap-add syslog " + "--privileged " f"--volume={build_path}:/package_folder " f"--volume={result_folder}:/test_output " f"--volume={repo_tests_path}:/usr/share/clickhouse-test " From 50bddc43dc29e308a420d99d2fd191b25dfe7306 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 19 Aug 2022 13:31:57 +0200 Subject: [PATCH 5/8] tests/stress: ignore NETLINK_ERROR from checkPermissionsImpl Since now with --privileged it has CAP_SYS_ADMIN and tries to communicate via netlink. Signed-off-by: Azat Khuzhin --- docker/test/stress/run.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/docker/test/stress/run.sh b/docker/test/stress/run.sh index c6dcc4a79ae..5dc5ffa2f21 100755 --- a/docker/test/stress/run.sh +++ b/docker/test/stress/run.sh @@ -387,6 +387,7 @@ else -e "TABLE_IS_READ_ONLY" \ -e "Code: 1000, e.code() = 111, Connection refused" \ -e "UNFINISHED" \ + -e "NETLINK_ERROR" \ -e "Renaming unexpected part" \ -e "PART_IS_TEMPORARILY_LOCKED" \ -e "and a merge is impossible: we didn't find" \ From e64166411580973d371ed5b3df3da7af4743cefa Mon Sep 17 00:00:00 2001 From: kssenii Date: Fri, 19 Aug 2022 18:15:18 +0200 Subject: [PATCH 6/8] Refactor --- src/Common/FileCache.cpp | 262 +++++++++++++++++++-------------------- src/Common/FileCache.h | 242 +++++++++++++++++++----------------- 2 files changed, 253 insertions(+), 251 deletions(-) diff --git a/src/Common/FileCache.cpp b/src/Common/FileCache.cpp index 0f2c4559177..1aa8a25bb79 100644 --- a/src/Common/FileCache.cpp +++ b/src/Common/FileCache.cpp @@ -30,12 +30,12 @@ FileCache::FileCache( , max_element_size(cache_settings_.max_elements) , max_file_segment_size(cache_settings_.max_file_segment_size) , allow_persistent_files(cache_settings_.do_not_evict_index_and_mark_files) + , enable_cache_hits_threshold(cache_settings_.enable_cache_hits_threshold) , enable_filesystem_query_cache_limit(cache_settings_.enable_filesystem_query_cache_limit) + , log(&Poco::Logger::get("FileCache")) , main_priority(std::make_unique()) , stash_priority(std::make_unique()) , max_stash_element_size(cache_settings_.max_elements) - , enable_cache_hits_threshold(cache_settings_.enable_cache_hits_threshold) - , log(&Poco::Logger::get("FileCache")) { } @@ -77,132 +77,6 @@ void FileCache::assertInitialized() const throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cache not initialized"); } -FileCache::QueryContextPtr FileCache::getCurrentQueryContext(std::lock_guard & cache_lock) -{ - if (!isQueryInitialized()) - return nullptr; - - return getQueryContext(std::string(CurrentThread::getQueryId()), cache_lock); -} - -FileCache::QueryContextPtr FileCache::getQueryContext(const String & query_id, std::lock_guard & /* cache_lock */) -{ - auto query_iter = query_map.find(query_id); - return (query_iter == query_map.end()) ? nullptr : query_iter->second; -} - -void FileCache::removeQueryContext(const String & query_id) -{ - std::lock_guard cache_lock(mutex); - auto query_iter = query_map.find(query_id); - - if (query_iter == query_map.end()) - { - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Attempt to release query context that does not exist (query_id: {})", - query_id); - } - - query_map.erase(query_iter); -} - -FileCache::QueryContextPtr FileCache::getOrSetQueryContext( - const String & query_id, const ReadSettings & settings, std::lock_guard & cache_lock) -{ - if (query_id.empty()) - return nullptr; - - auto context = getQueryContext(query_id, cache_lock); - if (context) - return context; - - auto query_context = std::make_shared(settings.max_query_cache_size, settings.skip_download_if_exceeds_query_cache); - auto query_iter = query_map.emplace(query_id, query_context).first; - return query_iter->second; -} - -FileCache::QueryContextHolder FileCache::getQueryContextHolder(const String & query_id, const ReadSettings & settings) -{ - std::lock_guard cache_lock(mutex); - - if (!enable_filesystem_query_cache_limit || settings.max_query_cache_size == 0) - return {}; - - /// if enable_filesystem_query_cache_limit is true, and max_query_cache_size large than zero, - /// we create context query for current query. - auto context = getOrSetQueryContext(query_id, settings, cache_lock); - return QueryContextHolder(query_id, this, context); -} - -void FileCache::QueryContext::remove(const Key & key, size_t offset, size_t size, std::lock_guard & cache_lock) -{ - if (cache_size < size) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Deleted cache size exceeds existing cache size"); - - if (!skip_download_if_exceeds_query_cache) - { - auto record = records.find({key, offset}); - if (record != records.end()) - { - record->second->removeAndGetNext(cache_lock); - records.erase({key, offset}); - } - } - cache_size -= size; -} - -void FileCache::QueryContext::reserve(const Key & key, size_t offset, size_t size, std::lock_guard & cache_lock) -{ - if (cache_size + size > max_cache_size) - { - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Reserved cache size exceeds the remaining cache size (key: {}, offset: {})", - key.toString(), offset); - } - - if (!skip_download_if_exceeds_query_cache) - { - auto record = records.find({key, offset}); - if (record == records.end()) - { - auto queue_iter = priority->add(key, offset, 0, cache_lock); - record = records.insert({{key, offset}, queue_iter}).first; - } - record->second->incrementSize(size, cache_lock); - } - cache_size += size; -} - -void FileCache::QueryContext::use(const Key & key, size_t offset, std::lock_guard & cache_lock) -{ - if (skip_download_if_exceeds_query_cache) - return; - - auto record = records.find({key, offset}); - if (record != records.end()) - record->second->use(cache_lock); -} - -FileCache::QueryContextHolder::QueryContextHolder( - const String & query_id_, - FileCache * cache_, - FileCache::QueryContextPtr context_) - : query_id(query_id_) - , cache(cache_) - , context(context_) -{ -} - -FileCache::QueryContextHolder::~QueryContextHolder() -{ - /// If only the query_map and the current holder hold the context_query, - /// the query has been completed and the query_context is released. - if (context && context.use_count() == 2) - cache->removeQueryContext(query_id); -} - void FileCache::initialize() { std::lock_guard cache_lock(mutex); @@ -1222,12 +1096,6 @@ size_t FileCache::getUsedCacheSizeUnlocked(std::lock_guard & cache_l return main_priority->getCacheSize(cache_lock); } -size_t FileCache::getAvailableCacheSize() const -{ - std::lock_guard cache_lock(mutex); - return getAvailableCacheSizeUnlocked(cache_lock); -} - size_t FileCache::getAvailableCacheSizeUnlocked(std::lock_guard & cache_lock) const { return max_size - getUsedCacheSizeUnlocked(cache_lock); @@ -1346,4 +1214,130 @@ void FileCache::assertPriorityCorrectness(std::lock_guard & cache_lo assert(main_priority->getElementsNum(cache_lock) <= max_element_size); } +FileCache::QueryContextPtr FileCache::getCurrentQueryContext(std::lock_guard & cache_lock) +{ + if (!isQueryInitialized()) + return nullptr; + + return getQueryContext(std::string(CurrentThread::getQueryId()), cache_lock); +} + +FileCache::QueryContextPtr FileCache::getQueryContext(const String & query_id, std::lock_guard & /* cache_lock */) +{ + auto query_iter = query_map.find(query_id); + return (query_iter == query_map.end()) ? nullptr : query_iter->second; +} + +void FileCache::removeQueryContext(const String & query_id) +{ + std::lock_guard cache_lock(mutex); + auto query_iter = query_map.find(query_id); + + if (query_iter == query_map.end()) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Attempt to release query context that does not exist (query_id: {})", + query_id); + } + + query_map.erase(query_iter); +} + +FileCache::QueryContextPtr FileCache::getOrSetQueryContext( + const String & query_id, const ReadSettings & settings, std::lock_guard & cache_lock) +{ + if (query_id.empty()) + return nullptr; + + auto context = getQueryContext(query_id, cache_lock); + if (context) + return context; + + auto query_context = std::make_shared(settings.max_query_cache_size, settings.skip_download_if_exceeds_query_cache); + auto query_iter = query_map.emplace(query_id, query_context).first; + return query_iter->second; +} + +FileCache::QueryContextHolder FileCache::getQueryContextHolder(const String & query_id, const ReadSettings & settings) +{ + std::lock_guard cache_lock(mutex); + + if (!enable_filesystem_query_cache_limit || settings.max_query_cache_size == 0) + return {}; + + /// if enable_filesystem_query_cache_limit is true, and max_query_cache_size large than zero, + /// we create context query for current query. + auto context = getOrSetQueryContext(query_id, settings, cache_lock); + return QueryContextHolder(query_id, this, context); +} + +void FileCache::QueryContext::remove(const Key & key, size_t offset, size_t size, std::lock_guard & cache_lock) +{ + if (cache_size < size) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Deleted cache size exceeds existing cache size"); + + if (!skip_download_if_exceeds_query_cache) + { + auto record = records.find({key, offset}); + if (record != records.end()) + { + record->second->removeAndGetNext(cache_lock); + records.erase({key, offset}); + } + } + cache_size -= size; +} + +void FileCache::QueryContext::reserve(const Key & key, size_t offset, size_t size, std::lock_guard & cache_lock) +{ + if (cache_size + size > max_cache_size) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Reserved cache size exceeds the remaining cache size (key: {}, offset: {})", + key.toString(), offset); + } + + if (!skip_download_if_exceeds_query_cache) + { + auto record = records.find({key, offset}); + if (record == records.end()) + { + auto queue_iter = priority->add(key, offset, 0, cache_lock); + record = records.insert({{key, offset}, queue_iter}).first; + } + record->second->incrementSize(size, cache_lock); + } + cache_size += size; +} + +void FileCache::QueryContext::use(const Key & key, size_t offset, std::lock_guard & cache_lock) +{ + if (skip_download_if_exceeds_query_cache) + return; + + auto record = records.find({key, offset}); + if (record != records.end()) + record->second->use(cache_lock); +} + +FileCache::QueryContextHolder::QueryContextHolder( + const String & query_id_, + FileCache * cache_, + FileCache::QueryContextPtr context_) + : query_id(query_id_) + , cache(cache_) + , context(context_) +{ +} + +FileCache::QueryContextHolder::~QueryContextHolder() +{ + /// If only the query_map and the current holder hold the context_query, + /// the query has been completed and the query_context is released. + if (context && context.use_count() == 2) + cache->removeQueryContext(query_id); +} + } diff --git a/src/Common/FileCache.h b/src/Common/FileCache.h index 5b368329c88..1690690d102 100644 --- a/src/Common/FileCache.h +++ b/src/Common/FileCache.h @@ -23,13 +23,17 @@ namespace DB { /// Local cache for remote filesystem files, represented as a set of non-overlapping non-empty file segments. -/// Different caching algorithms are implemented based on IFileCachePriority. +/// Different caching algorithms are implemented using IFileCachePriority. class FileCache : private boost::noncopyable { - friend class FileSegment; - friend class IFileCachePriority; - friend struct FileSegmentsHolder; - friend class FileSegmentRangeWriter; + +friend class FileSegment; +friend class IFileCachePriority; +friend struct FileSegmentsHolder; +friend class FileSegmentRangeWriter; + +struct QueryContext; +using QueryContextPtr = std::shared_ptr; public: using Key = DB::FileCacheKey; @@ -41,25 +45,8 @@ public: /// Restore cache from local filesystem. void initialize(); - void removeIfExists(const Key & key); - - void removeIfReleasable(); - - static bool isReadOnly(); - - /// Cache capacity in bytes. - size_t capacity() const { return max_size; } - - static Key hash(const String & path); - - String getPathInLocalCache(const Key & key, size_t offset, bool is_persistent) const; - - String getPathInLocalCache(const Key & key) const; - const String & getBasePath() const { return cache_base_path; } - std::vector tryGetCachePaths(const Key & key); - /** * Given an `offset` and `size` representing [offset, offset + size) bytes interval, * return list of cached non-overlapping non-empty @@ -84,6 +71,28 @@ public: */ FileSegmentsHolder get(const Key & key, size_t offset, size_t size); + /// Remove files by `key`. Removes files which might be used at the moment. + void removeIfExists(const Key & key); + + /// Remove files by `key`. Will not remove files which are used at the moment. + void removeIfReleasable(); + + static Key hash(const String & path); + + String getPathInLocalCache(const Key & key, size_t offset, bool is_persistent) const; + + String getPathInLocalCache(const Key & key) const; + + std::vector tryGetCachePaths(const Key & key); + + size_t capacity() const { return max_size; } + + size_t getUsedCacheSize() const; + + size_t getFileSegmentsNum() const; + + static bool isReadOnly(); + /** * Create a file segment of exactly requested size with EMPTY state. * Throw exception if requested size exceeds max allowed file segment size. @@ -102,92 +111,6 @@ public: /// For debug. String dumpStructure(const Key & key); - size_t getUsedCacheSize() const; - - size_t getFileSegmentsNum() const; - -private: - String cache_base_path; - size_t max_size; - size_t max_element_size; - size_t max_file_segment_size; - bool allow_persistent_files; - - bool is_initialized = false; - - mutable std::mutex mutex; - - bool tryReserve(const Key & key, size_t offset, size_t size, std::lock_guard & cache_lock); - - void remove(Key key, size_t offset, std::lock_guard & cache_lock, std::lock_guard & segment_lock); - - bool isLastFileSegmentHolder( - const Key & key, size_t offset, std::lock_guard & cache_lock, std::lock_guard & segment_lock); - - void reduceSizeToDownloaded( - const Key & key, size_t offset, std::lock_guard & cache_lock, std::lock_guard & /* segment_lock */); - - void assertInitialized() const; - - using AccessKeyAndOffset = std::pair; - struct KeyAndOffsetHash - { - std::size_t operator()(const AccessKeyAndOffset & key) const - { - return std::hash()(key.first.key) ^ std::hash()(key.second); - } - }; - - using FileCacheRecords = std::unordered_map; - - /// Used to track and control the cache access of each query. - /// Through it, we can realize the processing of different queries by the cache layer. - struct QueryContext - { - FileCacheRecords records; - FileCachePriorityPtr priority; - - size_t cache_size = 0; - size_t max_cache_size; - - bool skip_download_if_exceeds_query_cache; - - QueryContext(size_t max_cache_size_, bool skip_download_if_exceeds_query_cache_) - : max_cache_size(max_cache_size_), skip_download_if_exceeds_query_cache(skip_download_if_exceeds_query_cache_) - { - } - - void remove(const Key & key, size_t offset, size_t size, std::lock_guard & cache_lock); - - void reserve(const Key & key, size_t offset, size_t size, std::lock_guard & cache_lock); - - void use(const Key & key, size_t offset, std::lock_guard & cache_lock); - - size_t getMaxCacheSize() const { return max_cache_size; } - - size_t getCacheSize() const { return cache_size; } - - FileCachePriorityPtr getPriority() { return priority; } - - bool isSkipDownloadIfExceed() const { return skip_download_if_exceeds_query_cache; } - }; - - using QueryContextPtr = std::shared_ptr; - using QueryContextMap = std::unordered_map; - - QueryContextMap query_map; - - bool enable_filesystem_query_cache_limit; - - QueryContextPtr getCurrentQueryContext(std::lock_guard & cache_lock); - - QueryContextPtr getQueryContext(const String & query_id, std::lock_guard & cache_lock); - - void removeQueryContext(const String & query_id); - - QueryContextPtr getOrSetQueryContext(const String & query_id, const ReadSettings & settings, std::lock_guard &); - -public: /// Save a query context information, and adopt different cache policies /// for different queries through the context cache layer. struct QueryContextHolder : private boost::noncopyable @@ -206,6 +129,43 @@ public: QueryContextHolder getQueryContextHolder(const String & query_id, const ReadSettings & settings); private: + String cache_base_path; + + size_t max_size; + size_t max_element_size; + size_t max_file_segment_size; + + bool allow_persistent_files; + size_t enable_cache_hits_threshold; + bool enable_filesystem_query_cache_limit; + + Poco::Logger * log; + bool is_initialized = false; + + mutable std::mutex mutex; + + bool tryReserve(const Key & key, size_t offset, size_t size, std::lock_guard & cache_lock); + + void remove( + Key key, + size_t offset, + std::lock_guard & cache_lock, + std::lock_guard & segment_lock); + + bool isLastFileSegmentHolder( + const Key & key, + size_t offset, + std::lock_guard & cache_lock, + std::lock_guard & segment_lock); + + void reduceSizeToDownloaded( + const Key & key, + size_t offset, + std::lock_guard & cache_lock, + std::lock_guard & segment_lock); + + void assertInitialized() const; + struct FileSegmentCell : private boost::noncopyable { FileSegmentPtr file_segment; @@ -223,24 +183,30 @@ private: FileSegmentCell(FileSegmentPtr file_segment_, FileCache * cache, std::lock_guard & cache_lock); FileSegmentCell(FileSegmentCell && other) noexcept - : file_segment(std::move(other.file_segment)), queue_iterator(other.queue_iterator) + : file_segment(std::move(other.file_segment)), queue_iterator(std::move(other.queue_iterator)) {} + }; + + using AccessKeyAndOffset = std::pair; + struct KeyAndOffsetHash + { + std::size_t operator()(const AccessKeyAndOffset & key) const { + return std::hash()(key.first.key) ^ std::hash()(key.second); } }; using FileSegmentsByOffset = std::map; using CachedFiles = std::unordered_map; + using FileCacheRecords = std::unordered_map; CachedFiles files; std::unique_ptr main_priority; FileCacheRecords stash_records; std::unique_ptr stash_priority; - size_t max_stash_element_size; - size_t enable_cache_hits_threshold; - Poco::Logger * log; + void loadCacheInfoIntoMemory(std::lock_guard & cache_lock); FileSegments getImpl(const Key & key, const FileSegment::Range & range, std::lock_guard & cache_lock); @@ -257,11 +223,11 @@ private: void useCell(const FileSegmentCell & cell, FileSegments & result, std::lock_guard & cache_lock) const; bool tryReserveForMainList( - const Key & key, size_t offset, size_t size, QueryContextPtr query_context, std::lock_guard & cache_lock); - - size_t getAvailableCacheSize() const; - - void loadCacheInfoIntoMemory(std::lock_guard & cache_lock); + const Key & key, + size_t offset, + size_t size, + QueryContextPtr query_context, + std::lock_guard & cache_lock); FileSegments splitRangeIntoCells( const Key & key, @@ -289,6 +255,48 @@ private: void assertCacheCellsCorrectness(const FileSegmentsByOffset & cells_by_offset, std::lock_guard & cache_lock); + /// Used to track and control the cache access of each query. + /// Through it, we can realize the processing of different queries by the cache layer. + struct QueryContext + { + FileCacheRecords records; + FileCachePriorityPtr priority; + + size_t cache_size = 0; + size_t max_cache_size; + + bool skip_download_if_exceeds_query_cache; + + QueryContext(size_t max_cache_size_, bool skip_download_if_exceeds_query_cache_) + : max_cache_size(max_cache_size_) + , skip_download_if_exceeds_query_cache(skip_download_if_exceeds_query_cache_) {} + + size_t getMaxCacheSize() const { return max_cache_size; } + + size_t getCacheSize() const { return cache_size; } + + FileCachePriorityPtr getPriority() const { return priority; } + + bool isSkipDownloadIfExceed() const { return skip_download_if_exceeds_query_cache; } + + void remove(const Key & key, size_t offset, size_t size, std::lock_guard & cache_lock); + + void reserve(const Key & key, size_t offset, size_t size, std::lock_guard & cache_lock); + + void use(const Key & key, size_t offset, std::lock_guard & cache_lock); + }; + + using QueryContextMap = std::unordered_map; + QueryContextMap query_map; + + QueryContextPtr getCurrentQueryContext(std::lock_guard & cache_lock); + + QueryContextPtr getQueryContext(const String & query_id, std::lock_guard & cache_lock); + + void removeQueryContext(const String & query_id); + + QueryContextPtr getOrSetQueryContext(const String & query_id, const ReadSettings & settings, std::lock_guard &); + public: void assertCacheCorrectness(const Key & key, std::lock_guard & cache_lock); From 4a453f0776549c2970e8309c61b849f41daa1120 Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Fri, 19 Aug 2022 18:08:13 +0000 Subject: [PATCH 7/8] Fix obvious trash --- src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp index 08142bd8dd1..e2cd797ab92 100644 --- a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp @@ -703,11 +703,11 @@ size_t MergeTreeBaseSelectProcessor::estimateMaxBatchSizeForHugeRanges() { /// This is an empirical number and it is so, /// because we have an adaptive granularity by default. - const size_t average_granule_size_bytes = 8UL * 1024 * 1024 * 10; // 10 MiB + const size_t average_granule_size_bytes = 1024 * 1024 * 10; // 10 MiB /// We want to have one RTT per one gigabyte of data read from disk /// this could be configurable. - const size_t max_size_for_one_request = 8UL * 1024 * 1024 * 1024; // 1 GiB + const size_t max_size_for_one_request = 1024 * 1024 * 1024; // 1 GiB size_t sum_average_marks_size = 0; /// getColumnSize is not fully implemented for compact parts From 43227e76eb14d6c28bc12d29819c8e69e66e1908 Mon Sep 17 00:00:00 2001 From: Yakov Olkhovskiy <99031427+yakov-olkhovskiy@users.noreply.github.com> Date: Fri, 19 Aug 2022 14:55:02 -0400 Subject: [PATCH 8/8] use input token instead of env var --- .github/workflows/tags_stable.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/tags_stable.yml b/.github/workflows/tags_stable.yml index e0489ad1210..9711f7688cb 100644 --- a/.github/workflows/tags_stable.yml +++ b/.github/workflows/tags_stable.yml @@ -53,10 +53,9 @@ jobs: git diff HEAD - name: Create Pull Request uses: peter-evans/create-pull-request@v3 - env: - GITHUB_TOKEN: ${{ secrets.ROBOT_CLICKHOUSE_COMMIT_TOKEN }} with: author: "robot-clickhouse " + token: ${{ secrets.ROBOT_CLICKHOUSE_COMMIT_TOKEN }} committer: "robot-clickhouse " commit-message: Update version_date.tsv and changelogs after ${{ env.GITHUB_TAG }} branch: auto/${{ env.GITHUB_TAG }}