From f1e95fb78bae190bb87e93704cf5f88c70cdccf4 Mon Sep 17 00:00:00 2001 From: kssenii Date: Thu, 22 Feb 2024 15:38:44 +0100 Subject: [PATCH 01/22] Add a way to force read-through cache for merges --- src/Core/Settings.h | 1 + src/Disks/IO/ReadBufferFromRemoteFSGather.cpp | 10 +- .../Cached/CachedObjectStorage.cpp | 14 -- .../Cached/CachedObjectStorage.h | 2 - src/IO/ReadSettings.h | 2 +- src/Interpreters/Cache/FileSegment.cpp | 3 +- src/Interpreters/Context.cpp | 1 + .../MergeTree/MergeTreeSequentialSource.cpp | 2 +- .../integration/test_filesystem_cache/test.py | 79 ++++++++ .../users.d/cache_on_write_operations.xml | 7 + .../force_read_through_cache_on_merge.xml | 7 + ...system_cache_on_write_operations.reference | 170 ++++++++++++++++++ ...41_filesystem_cache_on_write_operations.sh | 81 +++++---- 13 files changed, 317 insertions(+), 62 deletions(-) create mode 100644 tests/integration/test_filesystem_cache/users.d/cache_on_write_operations.xml create mode 100644 tests/integration/test_filesystem_cache/users.d/force_read_through_cache_on_merge.xml diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 433195af9c3..db060bf712d 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -769,6 +769,7 @@ class IColumn; M(Bool, enable_filesystem_cache_on_write_operations, false, "Write into cache on write operations. To actually work this setting requires be added to disk config too", 0) \ M(Bool, enable_filesystem_cache_log, false, "Allows to record the filesystem caching log for each query", 0) \ M(Bool, read_from_filesystem_cache_if_exists_otherwise_bypass_cache, false, "Allow to use the filesystem cache in passive mode - benefit from the existing cache entries, but don't put more entries into the cache. If you set this setting for heavy ad-hoc queries and leave it disabled for short real-time queries, this will allows to avoid cache threshing by too heavy queries and to improve the overall system efficiency.", 0) \ + M(Bool, force_read_through_cache_for_merges, false, "Force read-through cache for merges", 0) \ M(Bool, skip_download_if_exceeds_query_cache, true, "Skip download from remote filesystem if exceeds query cache size", 0) \ M(UInt64, filesystem_cache_max_download_size, (128UL * 1024 * 1024 * 1024), "Max remote filesystem cache size that can be downloaded by a single query", 0) \ M(Bool, throw_on_error_from_cache_on_write_operations, false, "Ignore error from cache when caching on write operations (INSERT, merges)", 0) \ diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp index 0b3ecca3587..1da39c7011c 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp @@ -16,12 +16,10 @@ using namespace DB; namespace { -bool withCache(const ReadSettings & settings) -{ - return settings.remote_fs_cache && settings.enable_filesystem_cache - && (!CurrentThread::getQueryId().empty() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache - || !settings.avoid_readthrough_cache_outside_query_context); -} + bool withCache(const ReadSettings & settings) + { + return settings.remote_fs_cache && settings.enable_filesystem_cache; + } } namespace DB diff --git a/src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp b/src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp index 1444f4c9c76..e3ab772e3b5 100644 --- a/src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp +++ b/src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp @@ -43,10 +43,6 @@ ReadSettings CachedObjectStorage::patchSettings(const ReadSettings & read_settin { ReadSettings modified_settings{read_settings}; modified_settings.remote_fs_cache = cache; - - if (!canUseReadThroughCache(read_settings)) - modified_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = true; - return object_storage->patchSettings(modified_settings); } @@ -206,14 +202,4 @@ String CachedObjectStorage::getObjectsNamespace() const return object_storage->getObjectsNamespace(); } -bool CachedObjectStorage::canUseReadThroughCache(const ReadSettings & settings) -{ - if (!settings.avoid_readthrough_cache_outside_query_context) - return true; - - return CurrentThread::isInitialized() - && CurrentThread::get().getQueryContext() - && !CurrentThread::getQueryId().empty(); -} - } diff --git a/src/Disks/ObjectStorages/Cached/CachedObjectStorage.h b/src/Disks/ObjectStorages/Cached/CachedObjectStorage.h index 437baead7be..961c2709efc 100644 --- a/src/Disks/ObjectStorages/Cached/CachedObjectStorage.h +++ b/src/Disks/ObjectStorages/Cached/CachedObjectStorage.h @@ -119,8 +119,6 @@ public: const FileCacheSettings & getCacheSettings() const { return cache_settings; } - static bool canUseReadThroughCache(const ReadSettings & settings); - #if USE_AZURE_BLOB_STORAGE std::shared_ptr getAzureBlobStorageClient() override { diff --git a/src/IO/ReadSettings.h b/src/IO/ReadSettings.h index c397689d6ad..2c79735317d 100644 --- a/src/IO/ReadSettings.h +++ b/src/IO/ReadSettings.h @@ -99,7 +99,7 @@ struct ReadSettings bool read_from_filesystem_cache_if_exists_otherwise_bypass_cache = false; bool enable_filesystem_cache_log = false; /// Don't populate cache when the read is not part of query execution (e.g. background thread). - bool avoid_readthrough_cache_outside_query_context = true; + bool force_read_through_cache_merges = false; size_t filesystem_cache_segments_batch_size = 20; size_t filesystem_cache_max_download_size = (128UL * 1024 * 1024 * 1024); diff --git a/src/Interpreters/Cache/FileSegment.cpp b/src/Interpreters/Cache/FileSegment.cpp index 8bd89465917..7c0505889da 100644 --- a/src/Interpreters/Cache/FileSegment.cpp +++ b/src/Interpreters/Cache/FileSegment.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include @@ -194,7 +195,7 @@ bool FileSegment::isDownloaded() const String FileSegment::getCallerId() { if (!CurrentThread::isInitialized() || CurrentThread::getQueryId().empty()) - return "None:" + toString(getThreadId()); + return fmt::format("None:{}:{}", getThreadName(), toString(getThreadId())); return std::string(CurrentThread::getQueryId()) + ":" + toString(getThreadId()); } diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 55a4df10206..36b362e36bb 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -5079,6 +5079,7 @@ ReadSettings Context::getReadSettings() const res.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache; res.enable_filesystem_cache_log = settings.enable_filesystem_cache_log; res.filesystem_cache_segments_batch_size = settings.filesystem_cache_segments_batch_size; + res.force_read_through_cache_merges = settings.force_read_through_cache_for_merges; res.filesystem_cache_max_download_size = settings.filesystem_cache_max_download_size; res.skip_download_if_exceeds_query_cache = settings.skip_download_if_exceeds_query_cache; diff --git a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp index d0fbc316024..e375e8b0a9f 100644 --- a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp +++ b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp @@ -151,7 +151,7 @@ MergeTreeSequentialSource::MergeTreeSequentialSource( const auto & context = storage.getContext(); ReadSettings read_settings = context->getReadSettings(); - read_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = true; + read_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = !read_settings.force_read_through_cache_merges; /// It does not make sense to use pthread_threadpool for background merges/mutations /// And also to preserve backward compatibility read_settings.local_fs_method = LocalFSReadMethod::pread; diff --git a/tests/integration/test_filesystem_cache/test.py b/tests/integration/test_filesystem_cache/test.py index eb5f896f7a9..c1ba6702dcf 100644 --- a/tests/integration/test_filesystem_cache/test.py +++ b/tests/integration/test_filesystem_cache/test.py @@ -19,6 +19,9 @@ def cluster(): main_configs=[ "config.d/storage_conf.xml", ], + user_configs=[ + "users.d/cache_on_write_operations.xml", + ], stay_alive=True, ) cluster.add_instance( @@ -35,6 +38,17 @@ def cluster(): ], stay_alive=True, ) + cluster.add_instance( + "node_force_read_through_cache_on_merge", + main_configs=[ + "config.d/storage_conf.xml", + ], + user_configs=[ + "users.d/force_read_through_cache_on_merge.xml", + "users.d/cache_on_write_operations.xml", + ], + stay_alive=True, + ) logging.info("Starting cluster...") cluster.start() @@ -323,3 +337,68 @@ def test_custom_cached_disk(cluster): "SELECT cache_path FROM system.disks WHERE name = 'custom_cached4'" ).strip() ) + + +def test_force_filesystem_cache_on_merges(cluster): + def test(node, forced_read_through_cache_on_merge): + node.query( + """ + DROP TABLE IF EXISTS test SYNC; + + CREATE TABLE test (key UInt32, value String) + Engine=MergeTree() + ORDER BY value + SETTINGS disk = disk( + type = cache, + path = 'force_cache_on_merges', + disk = 'hdd_blob', + max_file_segment_size = '1Ki', + cache_on_write_operations = 1, + boundary_alignment = '1Ki', + max_size = '10Gi', + max_elements = 10000000, + load_metadata_threads = 30); + + SYSTEM DROP FILESYSTEM CACHE; + INSERT INTO test SELECT * FROM generateRandom('a Int32, b String') LIMIT 1000000; + INSERT INTO test SELECT * FROM generateRandom('a Int32, b String') LIMIT 1000000; + """ + ) + assert int(node.query("SELECT count() FROM system.filesystem_cache")) > 0 + assert int(node.query("SELECT max(size) FROM system.filesystem_cache")) == 1024 + + write_count = int( + node.query( + "SELECT value FROM system.events WHERE name = 'CachedWriteBufferCacheWriteBytes'" + ) + ) + assert write_count > 100000 + assert "" == node.query( + "SELECT value FROM system.events WHERE name = 'CachedReadBufferCacheWriteBytes'" + ) + + node.query("SYSTEM DROP FILESYSTEM CACHE") + node.query("OPTIMIZE TABLE test FINAL") + + new_write_count = int( + node.query( + "SELECT value FROM system.events WHERE name = 'CachedWriteBufferCacheWriteBytes'" + ) + ) + assert new_write_count >= write_count + + if forced_read_through_cache_on_merge: + assert 100000 < int( + node.query( + "SELECT value FROM system.events WHERE name = 'CachedReadBufferCacheWriteBytes'" + ) + ) + else: + assert "" == node.query( + "SELECT value FROM system.events WHERE name = 'CachedReadBufferCacheWriteBytes'" + ) + + node = cluster.instances["node_force_read_through_cache_on_merge"] + test(node, True) + node = cluster.instances["node"] + test(node, False) diff --git a/tests/integration/test_filesystem_cache/users.d/cache_on_write_operations.xml b/tests/integration/test_filesystem_cache/users.d/cache_on_write_operations.xml new file mode 100644 index 00000000000..5de169edc1e --- /dev/null +++ b/tests/integration/test_filesystem_cache/users.d/cache_on_write_operations.xml @@ -0,0 +1,7 @@ + + + + 1 + + + diff --git a/tests/integration/test_filesystem_cache/users.d/force_read_through_cache_on_merge.xml b/tests/integration/test_filesystem_cache/users.d/force_read_through_cache_on_merge.xml new file mode 100644 index 00000000000..4d26a1a8bc7 --- /dev/null +++ b/tests/integration/test_filesystem_cache/users.d/force_read_through_cache_on_merge.xml @@ -0,0 +1,7 @@ + + + + 1 + + + diff --git a/tests/queries/0_stateless/02241_filesystem_cache_on_write_operations.reference b/tests/queries/0_stateless/02241_filesystem_cache_on_write_operations.reference index 157837983f7..c03b928684b 100644 --- a/tests/queries/0_stateless/02241_filesystem_cache_on_write_operations.reference +++ b/tests/queries/0_stateless/02241_filesystem_cache_on_write_operations.reference @@ -1,62 +1,232 @@ Using storage policy: s3_cache +DROP TABLE IF EXISTS test_02241 +CREATE TABLE test_02241 (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache', min_bytes_for_wide_part = 10485760, compress_marks=false, compress_primary_key=false, ratio_of_defaults_for_sparse_serialization = 1 +SYSTEM STOP MERGES test_02241 +SYSTEM DROP FILESYSTEM CACHE +SELECT file_segment_range_begin, file_segment_range_end, size, state + FROM + ( + SELECT file_segment_range_begin, file_segment_range_end, size, state, local_path + FROM + ( + SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path + FROM system.remote_data_paths + ) AS data_paths + INNER JOIN + system.filesystem_cache AS caches + ON data_paths.cache_path = caches.cache_path + ) + WHERE endsWith(local_path, 'data.bin') + FORMAT Vertical +SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path 0 +SELECT count(), sum(size) FROM system.filesystem_cache 0 0 +INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100) +SELECT file_segment_range_begin, file_segment_range_end, size, state + FROM + ( + SELECT file_segment_range_begin, file_segment_range_end, size, state, local_path + FROM + ( + SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path + FROM system.remote_data_paths + ) AS data_paths + INNER JOIN + system.filesystem_cache AS caches + ON data_paths.cache_path = caches.cache_path + ) + WHERE endsWith(local_path, 'data.bin') + FORMAT Vertical Row 1: ────── file_segment_range_begin: 0 file_segment_range_end: 745 size: 746 state: DOWNLOADED +SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path 8 +SELECT count(), sum(size) FROM system.filesystem_cache 8 1100 +SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0 0 +SELECT * FROM test_02241 FORMAT Null +SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0 2 +SELECT * FROM test_02241 FORMAT Null +SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0 2 +SELECT count(), sum(size) size FROM system.filesystem_cache 8 1100 +SYSTEM DROP FILESYSTEM CACHE +INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100, 200) +SELECT file_segment_range_begin, file_segment_range_end, size, state + FROM + ( + SELECT file_segment_range_begin, file_segment_range_end, size, state, local_path + FROM + ( + SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path + FROM system.remote_data_paths + ) AS data_paths + INNER JOIN + system.filesystem_cache AS caches + ON data_paths.cache_path = caches.cache_path + ) + WHERE endsWith(local_path, 'data.bin') + FORMAT Vertical; Row 1: ────── file_segment_range_begin: 0 file_segment_range_end: 1659 size: 1660 state: DOWNLOADED +SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path 8 +SELECT count(), sum(size) FROM system.filesystem_cache 8 2014 +SELECT count(), sum(size) FROM system.filesystem_cache 8 2014 +INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100) SETTINGS enable_filesystem_cache_on_write_operations=0 +SELECT count(), sum(size) FROM system.filesystem_cache 8 2014 +INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100) +INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(300, 10000) +SELECT count(), sum(size) FROM system.filesystem_cache 24 84045 +SYSTEM START MERGES test_02241 +SELECT value FROM system.events WHERE name = 'CachedWriteBufferCacheWriteBytes' +85146 +SELECT value FROM system.events WHERE name = 'CachedReadBufferCacheWriteBytes' +OPTIMIZE TABLE test_02241 FINAL +SELECT value FROM system.events WHERE name = 'CachedWriteBufferCacheWriteBytes' +251542 +SELECT value FROM system.events WHERE name = 'CachedReadBufferCacheWriteBytes' +SELECT count(), sum(size) FROM system.filesystem_cache 32 167243 +ALTER TABLE test_02241 UPDATE value = 'kek' WHERE key = 100 +SELECT count(), sum(size) FROM system.filesystem_cache 41 250541 +INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(5000000) +SYSTEM FLUSH LOGS INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(5000000) 0 +SELECT count() FROM test_02241 5010500 +SELECT count() FROM test_02241 WHERE value LIKE '%010%' 18816 Using storage policy: local_cache +DROP TABLE IF EXISTS test_02241 +CREATE TABLE test_02241 (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='local_cache', min_bytes_for_wide_part = 10485760, compress_marks=false, compress_primary_key=false, ratio_of_defaults_for_sparse_serialization = 1 +SYSTEM STOP MERGES test_02241 +SYSTEM DROP FILESYSTEM CACHE +SELECT file_segment_range_begin, file_segment_range_end, size, state + FROM + ( + SELECT file_segment_range_begin, file_segment_range_end, size, state, local_path + FROM + ( + SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path + FROM system.remote_data_paths + ) AS data_paths + INNER JOIN + system.filesystem_cache AS caches + ON data_paths.cache_path = caches.cache_path + ) + WHERE endsWith(local_path, 'data.bin') + FORMAT Vertical +SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path 0 +SELECT count(), sum(size) FROM system.filesystem_cache 0 0 +INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100) +SELECT file_segment_range_begin, file_segment_range_end, size, state + FROM + ( + SELECT file_segment_range_begin, file_segment_range_end, size, state, local_path + FROM + ( + SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path + FROM system.remote_data_paths + ) AS data_paths + INNER JOIN + system.filesystem_cache AS caches + ON data_paths.cache_path = caches.cache_path + ) + WHERE endsWith(local_path, 'data.bin') + FORMAT Vertical Row 1: ────── file_segment_range_begin: 0 file_segment_range_end: 745 size: 746 state: DOWNLOADED +SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path 8 +SELECT count(), sum(size) FROM system.filesystem_cache 8 1100 +SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0 0 +SELECT * FROM test_02241 FORMAT Null +SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0 2 +SELECT * FROM test_02241 FORMAT Null +SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0 2 +SELECT count(), sum(size) size FROM system.filesystem_cache 8 1100 +SYSTEM DROP FILESYSTEM CACHE +INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100, 200) +SELECT file_segment_range_begin, file_segment_range_end, size, state + FROM + ( + SELECT file_segment_range_begin, file_segment_range_end, size, state, local_path + FROM + ( + SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path + FROM system.remote_data_paths + ) AS data_paths + INNER JOIN + system.filesystem_cache AS caches + ON data_paths.cache_path = caches.cache_path + ) + WHERE endsWith(local_path, 'data.bin') + FORMAT Vertical; Row 1: ────── file_segment_range_begin: 0 file_segment_range_end: 1659 size: 1660 state: DOWNLOADED +SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path 8 +SELECT count(), sum(size) FROM system.filesystem_cache 8 2014 +SELECT count(), sum(size) FROM system.filesystem_cache 8 2014 +INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100) SETTINGS enable_filesystem_cache_on_write_operations=0 +SELECT count(), sum(size) FROM system.filesystem_cache 8 2014 +INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100) +INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(300, 10000) +SELECT count(), sum(size) FROM system.filesystem_cache 24 84045 +SYSTEM START MERGES test_02241 +SELECT value FROM system.events WHERE name = 'CachedWriteBufferCacheWriteBytes' +81715476 +SELECT value FROM system.events WHERE name = 'CachedReadBufferCacheWriteBytes' +OPTIMIZE TABLE test_02241 FINAL +SELECT value FROM system.events WHERE name = 'CachedWriteBufferCacheWriteBytes' +81881872 +SELECT value FROM system.events WHERE name = 'CachedReadBufferCacheWriteBytes' +SELECT count(), sum(size) FROM system.filesystem_cache 32 167243 +ALTER TABLE test_02241 UPDATE value = 'kek' WHERE key = 100 +SELECT count(), sum(size) FROM system.filesystem_cache 41 250541 +INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(5000000) +SYSTEM FLUSH LOGS INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(5000000) 0 +SELECT count() FROM test_02241 5010500 +SELECT count() FROM test_02241 WHERE value LIKE '%010%' 18816 diff --git a/tests/queries/0_stateless/02241_filesystem_cache_on_write_operations.sh b/tests/queries/0_stateless/02241_filesystem_cache_on_write_operations.sh index 96f61cf61e8..2b237492e98 100755 --- a/tests/queries/0_stateless/02241_filesystem_cache_on_write_operations.sh +++ b/tests/queries/0_stateless/02241_filesystem_cache_on_write_operations.sh @@ -10,13 +10,13 @@ CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) for STORAGE_POLICY in 's3_cache' 'local_cache'; do echo "Using storage policy: $STORAGE_POLICY" - $CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS test_02241" - $CLICKHOUSE_CLIENT --query "CREATE TABLE test_02241 (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='$STORAGE_POLICY', min_bytes_for_wide_part = 10485760, compress_marks=false, compress_primary_key=false, ratio_of_defaults_for_sparse_serialization = 1" - $CLICKHOUSE_CLIENT --query "SYSTEM STOP MERGES test_02241" + $CLICKHOUSE_CLIENT --echo --query "DROP TABLE IF EXISTS test_02241" + $CLICKHOUSE_CLIENT --echo --query "CREATE TABLE test_02241 (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='$STORAGE_POLICY', min_bytes_for_wide_part = 10485760, compress_marks=false, compress_primary_key=false, ratio_of_defaults_for_sparse_serialization = 1" + $CLICKHOUSE_CLIENT --echo --query "SYSTEM STOP MERGES test_02241" - $CLICKHOUSE_CLIENT --query "SYSTEM DROP FILESYSTEM CACHE" + $CLICKHOUSE_CLIENT --echo --query "SYSTEM DROP FILESYSTEM CACHE" - $CLICKHOUSE_CLIENT -n --query "SELECT file_segment_range_begin, file_segment_range_end, size, state + $CLICKHOUSE_CLIENT --echo -n --query "SELECT file_segment_range_begin, file_segment_range_end, size, state FROM ( SELECT file_segment_range_begin, file_segment_range_end, size, state, local_path @@ -32,12 +32,12 @@ for STORAGE_POLICY in 's3_cache' 'local_cache'; do WHERE endsWith(local_path, 'data.bin') FORMAT Vertical" - $CLICKHOUSE_CLIENT --query "SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path" - $CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache" + $CLICKHOUSE_CLIENT --echo --query "SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path" + $CLICKHOUSE_CLIENT --echo --query "SELECT count(), sum(size) FROM system.filesystem_cache" - $CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100)" + $CLICKHOUSE_CLIENT --echo --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100)" - $CLICKHOUSE_CLIENT -n --query "SELECT file_segment_range_begin, file_segment_range_end, size, state + $CLICKHOUSE_CLIENT --echo -n --query "SELECT file_segment_range_begin, file_segment_range_end, size, state FROM ( SELECT file_segment_range_begin, file_segment_range_end, size, state, local_path @@ -53,24 +53,24 @@ for STORAGE_POLICY in 's3_cache' 'local_cache'; do WHERE endsWith(local_path, 'data.bin') FORMAT Vertical" - $CLICKHOUSE_CLIENT --query "SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path" - $CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache" + $CLICKHOUSE_CLIENT --echo --query "SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path" + $CLICKHOUSE_CLIENT --echo --query "SELECT count(), sum(size) FROM system.filesystem_cache" - $CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0" + $CLICKHOUSE_CLIENT --echo --query "SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0" - $CLICKHOUSE_CLIENT --query "SELECT * FROM test_02241 FORMAT Null" - $CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0" + $CLICKHOUSE_CLIENT --echo --query "SELECT * FROM test_02241 FORMAT Null" + $CLICKHOUSE_CLIENT --echo --query "SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0" - $CLICKHOUSE_CLIENT --query "SELECT * FROM test_02241 FORMAT Null" - $CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0" + $CLICKHOUSE_CLIENT --echo --query "SELECT * FROM test_02241 FORMAT Null" + $CLICKHOUSE_CLIENT --echo --query "SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0" - $CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) size FROM system.filesystem_cache" + $CLICKHOUSE_CLIENT --echo --query "SELECT count(), sum(size) size FROM system.filesystem_cache" - $CLICKHOUSE_CLIENT --query "SYSTEM DROP FILESYSTEM CACHE" + $CLICKHOUSE_CLIENT --echo --query "SYSTEM DROP FILESYSTEM CACHE" - $CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100, 200)" + $CLICKHOUSE_CLIENT --echo --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100, 200)" - $CLICKHOUSE_CLIENT -n --query "SELECT file_segment_range_begin, file_segment_range_end, size, state + $CLICKHOUSE_CLIENT --echo -n --query "SELECT file_segment_range_begin, file_segment_range_end, size, state FROM ( SELECT file_segment_range_begin, file_segment_range_end, size, state, local_path @@ -86,27 +86,34 @@ for STORAGE_POLICY in 's3_cache' 'local_cache'; do WHERE endsWith(local_path, 'data.bin') FORMAT Vertical;" - $CLICKHOUSE_CLIENT --query "SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path" - $CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache" + $CLICKHOUSE_CLIENT --echo --query "SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path" + $CLICKHOUSE_CLIENT --echo --query "SELECT count(), sum(size) FROM system.filesystem_cache" - $CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache" - $CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100) SETTINGS enable_filesystem_cache_on_write_operations=0" - $CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache" + $CLICKHOUSE_CLIENT --echo --query "SELECT count(), sum(size) FROM system.filesystem_cache" + $CLICKHOUSE_CLIENT --echo --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100) SETTINGS enable_filesystem_cache_on_write_operations=0" + $CLICKHOUSE_CLIENT --echo --query "SELECT count(), sum(size) FROM system.filesystem_cache" - $CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100)" - $CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(300, 10000)" - $CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache" + $CLICKHOUSE_CLIENT --echo --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100)" + $CLICKHOUSE_CLIENT --echo --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(300, 10000)" + $CLICKHOUSE_CLIENT --echo --query "SELECT count(), sum(size) FROM system.filesystem_cache" - $CLICKHOUSE_CLIENT --query "SYSTEM START MERGES test_02241" + $CLICKHOUSE_CLIENT --echo --query "SYSTEM START MERGES test_02241" - $CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "OPTIMIZE TABLE test_02241 FINAL" - $CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache" + $CLICKHOUSE_CLIENT --echo --query "SELECT value FROM system.events WHERE name = 'CachedWriteBufferCacheWriteBytes'" + $CLICKHOUSE_CLIENT --echo --query "SELECT value FROM system.events WHERE name = 'CachedReadBufferCacheWriteBytes'" - $CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --mutations_sync=2 --query "ALTER TABLE test_02241 UPDATE value = 'kek' WHERE key = 100" - $CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache" - $CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(5000000)" + $CLICKHOUSE_CLIENT --echo --enable_filesystem_cache_on_write_operations=1 --query "OPTIMIZE TABLE test_02241 FINAL" - $CLICKHOUSE_CLIENT --query "SYSTEM FLUSH LOGS" + $CLICKHOUSE_CLIENT --echo --query "SELECT value FROM system.events WHERE name = 'CachedWriteBufferCacheWriteBytes'" + $CLICKHOUSE_CLIENT --echo --query "SELECT value FROM system.events WHERE name = 'CachedReadBufferCacheWriteBytes'" + + $CLICKHOUSE_CLIENT --echo --query "SELECT count(), sum(size) FROM system.filesystem_cache" + + $CLICKHOUSE_CLIENT --echo --enable_filesystem_cache_on_write_operations=1 --mutations_sync=2 --query "ALTER TABLE test_02241 UPDATE value = 'kek' WHERE key = 100" + $CLICKHOUSE_CLIENT --echo --query "SELECT count(), sum(size) FROM system.filesystem_cache" + $CLICKHOUSE_CLIENT --echo --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(5000000)" + + $CLICKHOUSE_CLIENT --echo --query "SYSTEM FLUSH LOGS" $CLICKHOUSE_CLIENT -n --query "SELECT query, ProfileEvents['RemoteFSReadBytes'] > 0 as remote_fs_read @@ -121,6 +128,6 @@ for STORAGE_POLICY in 's3_cache' 'local_cache'; do DESC LIMIT 1" - $CLICKHOUSE_CLIENT --query "SELECT count() FROM test_02241" - $CLICKHOUSE_CLIENT --query "SELECT count() FROM test_02241 WHERE value LIKE '%010%'" + $CLICKHOUSE_CLIENT --echo --query "SELECT count() FROM test_02241" + $CLICKHOUSE_CLIENT --echo --query "SELECT count() FROM test_02241 WHERE value LIKE '%010%'" done From 18741f122eabaeb7903f355958af1e1a88818e83 Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 26 Feb 2024 12:42:13 +0800 Subject: [PATCH 02/22] Move a setting to server setting --- src/Core/ServerSettings.h | 2 ++ src/Core/Settings.h | 1 - src/Interpreters/Context.cpp | 2 +- .../config.d/force_read_through_cache_for_merges.xml | 3 +++ tests/integration/test_filesystem_cache/test.py | 2 +- .../users.d/force_read_through_cache_on_merge.xml | 7 ------- 6 files changed, 7 insertions(+), 10 deletions(-) create mode 100644 tests/integration/test_filesystem_cache/config.d/force_read_through_cache_for_merges.xml delete mode 100644 tests/integration/test_filesystem_cache/users.d/force_read_through_cache_on_merge.xml diff --git a/src/Core/ServerSettings.h b/src/Core/ServerSettings.h index de2a4e9b755..0283b98638f 100644 --- a/src/Core/ServerSettings.h +++ b/src/Core/ServerSettings.h @@ -103,6 +103,8 @@ namespace DB M(Bool, async_load_databases, false, "Enable asynchronous loading of databases and tables to speedup server startup. Queries to not yet loaded entity will be blocked until load is finished.", 0) \ M(Bool, display_secrets_in_show_and_select, false, "Allow showing secrets in SHOW and SELECT queries via a format setting and a grant", 0) \ \ + M(Bool, force_read_through_cache_for_merges, false, "Force read-through filesystem cache for merges", 0) \ + \ M(Seconds, keep_alive_timeout, DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT, "The number of seconds that ClickHouse waits for incoming requests before closing the connection.", 0) \ M(Seconds, replicated_fetches_http_connection_timeout, 0, "HTTP connection timeout for part fetch requests. Inherited from default profile `http_connection_timeout` if not set explicitly.", 0) \ M(Seconds, replicated_fetches_http_send_timeout, 0, "HTTP send timeout for part fetch requests. Inherited from default profile `http_send_timeout` if not set explicitly.", 0) \ diff --git a/src/Core/Settings.h b/src/Core/Settings.h index db060bf712d..433195af9c3 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -769,7 +769,6 @@ class IColumn; M(Bool, enable_filesystem_cache_on_write_operations, false, "Write into cache on write operations. To actually work this setting requires be added to disk config too", 0) \ M(Bool, enable_filesystem_cache_log, false, "Allows to record the filesystem caching log for each query", 0) \ M(Bool, read_from_filesystem_cache_if_exists_otherwise_bypass_cache, false, "Allow to use the filesystem cache in passive mode - benefit from the existing cache entries, but don't put more entries into the cache. If you set this setting for heavy ad-hoc queries and leave it disabled for short real-time queries, this will allows to avoid cache threshing by too heavy queries and to improve the overall system efficiency.", 0) \ - M(Bool, force_read_through_cache_for_merges, false, "Force read-through cache for merges", 0) \ M(Bool, skip_download_if_exceeds_query_cache, true, "Skip download from remote filesystem if exceeds query cache size", 0) \ M(UInt64, filesystem_cache_max_download_size, (128UL * 1024 * 1024 * 1024), "Max remote filesystem cache size that can be downloaded by a single query", 0) \ M(Bool, throw_on_error_from_cache_on_write_operations, false, "Ignore error from cache when caching on write operations (INSERT, merges)", 0) \ diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 36b362e36bb..a974eaca067 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -5079,7 +5079,7 @@ ReadSettings Context::getReadSettings() const res.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache; res.enable_filesystem_cache_log = settings.enable_filesystem_cache_log; res.filesystem_cache_segments_batch_size = settings.filesystem_cache_segments_batch_size; - res.force_read_through_cache_merges = settings.force_read_through_cache_for_merges; + res.force_read_through_cache_merges = getServerSettings().force_read_through_cache_for_merges; res.filesystem_cache_max_download_size = settings.filesystem_cache_max_download_size; res.skip_download_if_exceeds_query_cache = settings.skip_download_if_exceeds_query_cache; diff --git a/tests/integration/test_filesystem_cache/config.d/force_read_through_cache_for_merges.xml b/tests/integration/test_filesystem_cache/config.d/force_read_through_cache_for_merges.xml new file mode 100644 index 00000000000..bb2a6e850a4 --- /dev/null +++ b/tests/integration/test_filesystem_cache/config.d/force_read_through_cache_for_merges.xml @@ -0,0 +1,3 @@ + + 1 + diff --git a/tests/integration/test_filesystem_cache/test.py b/tests/integration/test_filesystem_cache/test.py index c1ba6702dcf..f32fa4e9823 100644 --- a/tests/integration/test_filesystem_cache/test.py +++ b/tests/integration/test_filesystem_cache/test.py @@ -42,9 +42,9 @@ def cluster(): "node_force_read_through_cache_on_merge", main_configs=[ "config.d/storage_conf.xml", + "config.d/force_read_through_cache_for_merges.xml", ], user_configs=[ - "users.d/force_read_through_cache_on_merge.xml", "users.d/cache_on_write_operations.xml", ], stay_alive=True, diff --git a/tests/integration/test_filesystem_cache/users.d/force_read_through_cache_on_merge.xml b/tests/integration/test_filesystem_cache/users.d/force_read_through_cache_on_merge.xml deleted file mode 100644 index 4d26a1a8bc7..00000000000 --- a/tests/integration/test_filesystem_cache/users.d/force_read_through_cache_on_merge.xml +++ /dev/null @@ -1,7 +0,0 @@ - - - - 1 - - - From bf5affbe640976d2b73e12f5213a13baacf40619 Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 26 Feb 2024 16:37:09 +0800 Subject: [PATCH 03/22] Fix test --- .../02241_filesystem_cache_on_write_operations.sh | 6 ------ 1 file changed, 6 deletions(-) diff --git a/tests/queries/0_stateless/02241_filesystem_cache_on_write_operations.sh b/tests/queries/0_stateless/02241_filesystem_cache_on_write_operations.sh index 2b237492e98..ee1d942a421 100755 --- a/tests/queries/0_stateless/02241_filesystem_cache_on_write_operations.sh +++ b/tests/queries/0_stateless/02241_filesystem_cache_on_write_operations.sh @@ -99,14 +99,8 @@ for STORAGE_POLICY in 's3_cache' 'local_cache'; do $CLICKHOUSE_CLIENT --echo --query "SYSTEM START MERGES test_02241" - $CLICKHOUSE_CLIENT --echo --query "SELECT value FROM system.events WHERE name = 'CachedWriteBufferCacheWriteBytes'" - $CLICKHOUSE_CLIENT --echo --query "SELECT value FROM system.events WHERE name = 'CachedReadBufferCacheWriteBytes'" - $CLICKHOUSE_CLIENT --echo --enable_filesystem_cache_on_write_operations=1 --query "OPTIMIZE TABLE test_02241 FINAL" - $CLICKHOUSE_CLIENT --echo --query "SELECT value FROM system.events WHERE name = 'CachedWriteBufferCacheWriteBytes'" - $CLICKHOUSE_CLIENT --echo --query "SELECT value FROM system.events WHERE name = 'CachedReadBufferCacheWriteBytes'" - $CLICKHOUSE_CLIENT --echo --query "SELECT count(), sum(size) FROM system.filesystem_cache" $CLICKHOUSE_CLIENT --echo --enable_filesystem_cache_on_write_operations=1 --mutations_sync=2 --query "ALTER TABLE test_02241 UPDATE value = 'kek' WHERE key = 100" From ac4af6a4ad3b67860eae79b2ed3320fc5981a954 Mon Sep 17 00:00:00 2001 From: avogar Date: Mon, 26 Feb 2024 19:58:49 +0000 Subject: [PATCH 04/22] Don't allow to set max_parallel_replicas to 0 as it doesn't make sense --- src/Client/ConnectionPoolWithFailover.cpp | 9 +++++++++ src/Client/HedgedConnectionsFactory.cpp | 3 +++ src/Client/HedgedConnectionsFactory.h | 2 +- src/Interpreters/InterpreterSelectQuery.cpp | 4 ++-- src/Planner/PlannerJoinTree.cpp | 4 ++-- .../03001_max_parallel_replicas_zero_value.reference | 0 .../03001_max_parallel_replicas_zero_value.sql | 5 +++++ 7 files changed, 22 insertions(+), 5 deletions(-) create mode 100644 tests/queries/0_stateless/03001_max_parallel_replicas_zero_value.reference create mode 100644 tests/queries/0_stateless/03001_max_parallel_replicas_zero_value.sql diff --git a/src/Client/ConnectionPoolWithFailover.cpp b/src/Client/ConnectionPoolWithFailover.cpp index 492fd4ae9e2..46b9741c812 100644 --- a/src/Client/ConnectionPoolWithFailover.cpp +++ b/src/Client/ConnectionPoolWithFailover.cpp @@ -191,11 +191,20 @@ std::vector ConnectionPoolWithFailover::g max_entries = nested_pools.size(); } else if (pool_mode == PoolMode::GET_ONE) + { max_entries = 1; + } else if (pool_mode == PoolMode::GET_MANY) + { + if (settings.max_parallel_replicas == 0) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "The value of the setting max_parallel_replicas must be greater than 0"); + max_entries = settings.max_parallel_replicas; + } else + { throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknown pool allocation mode"); + } if (!priority_func) priority_func = makeGetPriorityFunc(settings); diff --git a/src/Client/HedgedConnectionsFactory.cpp b/src/Client/HedgedConnectionsFactory.cpp index f5b074a0257..a4e5dbf04ac 100644 --- a/src/Client/HedgedConnectionsFactory.cpp +++ b/src/Client/HedgedConnectionsFactory.cpp @@ -82,6 +82,9 @@ std::vector HedgedConnectionsFactory::getManyConnections(PoolMode } case PoolMode::GET_MANY: { + if (max_parallel_replicas == 0) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "The value of the setting max_parallel_replicas must be greater than 0"); + max_entries = max_parallel_replicas; break; } diff --git a/src/Client/HedgedConnectionsFactory.h b/src/Client/HedgedConnectionsFactory.h index ce7b553acdd..dd600d58e1e 100644 --- a/src/Client/HedgedConnectionsFactory.h +++ b/src/Client/HedgedConnectionsFactory.h @@ -158,7 +158,7 @@ private: /// checking the number of requested replicas that are still in process). size_t requested_connections_count = 0; - const size_t max_parallel_replicas = 0; + const size_t max_parallel_replicas = 1; const bool skip_unavailable_shards = 0; }; diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index d34294b4c4b..fe5e5dc69d1 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -871,7 +871,7 @@ bool InterpreterSelectQuery::adjustParallelReplicasAfterAnalysis() { /// The query could use trivial count if it didn't use parallel replicas, so let's disable it and reanalyze context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0)); - context->setSetting("max_parallel_replicas", UInt64{0}); + context->setSetting("max_parallel_replicas", UInt64{1}); LOG_DEBUG(log, "Disabling parallel replicas to be able to use a trivial count optimization"); return true; } @@ -909,7 +909,7 @@ bool InterpreterSelectQuery::adjustParallelReplicasAfterAnalysis() if (number_of_replicas_to_use <= 1) { context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0)); - context->setSetting("max_parallel_replicas", UInt64{0}); + context->setSetting("max_parallel_replicas", UInt64{1}); LOG_DEBUG(log, "Disabling parallel replicas because there aren't enough rows to read"); return true; } diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index e6a459d0e8a..2b1cd7fb353 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -295,7 +295,7 @@ bool applyTrivialCountIfPossible( /// The query could use trivial count if it didn't use parallel replicas, so let's disable it query_context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0)); - query_context->setSetting("max_parallel_replicas", UInt64{0}); + query_context->setSetting("max_parallel_replicas", UInt64{1}); LOG_TRACE(getLogger("Planner"), "Disabling parallel replicas to be able to use a trivial count optimization"); } @@ -756,7 +756,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres { planner_context->getMutableQueryContext()->setSetting( "allow_experimental_parallel_reading_from_replicas", Field(0)); - planner_context->getMutableQueryContext()->setSetting("max_parallel_replicas", UInt64{0}); + planner_context->getMutableQueryContext()->setSetting("max_parallel_replicas", UInt64{1}); LOG_DEBUG(getLogger("Planner"), "Disabling parallel replicas because there aren't enough rows to read"); } else if (number_of_replicas_to_use < settings.max_parallel_replicas) diff --git a/tests/queries/0_stateless/03001_max_parallel_replicas_zero_value.reference b/tests/queries/0_stateless/03001_max_parallel_replicas_zero_value.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/03001_max_parallel_replicas_zero_value.sql b/tests/queries/0_stateless/03001_max_parallel_replicas_zero_value.sql new file mode 100644 index 00000000000..611aa4777ba --- /dev/null +++ b/tests/queries/0_stateless/03001_max_parallel_replicas_zero_value.sql @@ -0,0 +1,5 @@ +drop table if exists test_d; +create table test_d engine=Distributed(test_cluster_two_shard_three_replicas_localhost, system, numbers); +select * from test_d limit 10 settings max_parallel_replicas = 0, prefer_localhost_replica = 0; --{serverError BAD_ARGUMENTS} +drop table test_d; + From 8aa9f36484bbe814a1e3edccc608e71b73915857 Mon Sep 17 00:00:00 2001 From: Kruglov Pavel <48961922+Avogar@users.noreply.github.com> Date: Mon, 26 Feb 2024 22:05:54 +0100 Subject: [PATCH 05/22] Fix style --- src/Client/ConnectionPoolWithFailover.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Client/ConnectionPoolWithFailover.cpp b/src/Client/ConnectionPoolWithFailover.cpp index 46b9741c812..ad8ed0067d8 100644 --- a/src/Client/ConnectionPoolWithFailover.cpp +++ b/src/Client/ConnectionPoolWithFailover.cpp @@ -21,6 +21,7 @@ namespace ErrorCodes { extern const int LOGICAL_ERROR; extern const int ALL_CONNECTION_TRIES_FAILED; + extern const int BAD_ARGUMENTS; } From f264f0a0360baf1413ec38d3f3f30c70595064f4 Mon Sep 17 00:00:00 2001 From: Kruglov Pavel <48961922+Avogar@users.noreply.github.com> Date: Mon, 26 Feb 2024 22:06:10 +0100 Subject: [PATCH 06/22] Fix style --- src/Client/HedgedConnectionsFactory.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Client/HedgedConnectionsFactory.cpp b/src/Client/HedgedConnectionsFactory.cpp index a4e5dbf04ac..16a03a696bd 100644 --- a/src/Client/HedgedConnectionsFactory.cpp +++ b/src/Client/HedgedConnectionsFactory.cpp @@ -19,6 +19,7 @@ namespace ErrorCodes extern const int ALL_CONNECTION_TRIES_FAILED; extern const int ALL_REPLICAS_ARE_STALE; extern const int LOGICAL_ERROR; + extern const int BAD_ARGUMENTS; } HedgedConnectionsFactory::HedgedConnectionsFactory( From 58a53b42acb3b25a41e8529186db9df0d4387f77 Mon Sep 17 00:00:00 2001 From: Kruglov Pavel <48961922+Avogar@users.noreply.github.com> Date: Tue, 27 Feb 2024 14:31:35 +0100 Subject: [PATCH 07/22] Set max_entries to min(max_parallel_replicas, all available reolicas) --- src/Client/HedgedConnectionsFactory.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Client/HedgedConnectionsFactory.cpp b/src/Client/HedgedConnectionsFactory.cpp index 16a03a696bd..703cc1f8821 100644 --- a/src/Client/HedgedConnectionsFactory.cpp +++ b/src/Client/HedgedConnectionsFactory.cpp @@ -86,7 +86,7 @@ std::vector HedgedConnectionsFactory::getManyConnections(PoolMode if (max_parallel_replicas == 0) throw Exception(ErrorCodes::BAD_ARGUMENTS, "The value of the setting max_parallel_replicas must be greater than 0"); - max_entries = max_parallel_replicas; + max_entries = std::min(max_parallel_replicas, shuffled_pools.size()); break; } } From 5771e739f0e65baae69f1e7abd42495d5fbc5488 Mon Sep 17 00:00:00 2001 From: Kseniia Sumarokova <54203879+kssenii@users.noreply.github.com> Date: Tue, 27 Feb 2024 23:11:29 +0800 Subject: [PATCH 08/22] Update ReadSettings.h --- src/IO/ReadSettings.h | 1 - 1 file changed, 1 deletion(-) diff --git a/src/IO/ReadSettings.h b/src/IO/ReadSettings.h index 2c79735317d..846fcd668f0 100644 --- a/src/IO/ReadSettings.h +++ b/src/IO/ReadSettings.h @@ -98,7 +98,6 @@ struct ReadSettings bool enable_filesystem_cache = true; bool read_from_filesystem_cache_if_exists_otherwise_bypass_cache = false; bool enable_filesystem_cache_log = false; - /// Don't populate cache when the read is not part of query execution (e.g. background thread). bool force_read_through_cache_merges = false; size_t filesystem_cache_segments_batch_size = 20; From 1eba06dc113881b2845d36a7d3a4703ad64659d7 Mon Sep 17 00:00:00 2001 From: Kseniia Sumarokova <54203879+kssenii@users.noreply.github.com> Date: Tue, 27 Feb 2024 23:12:41 +0800 Subject: [PATCH 09/22] Update 02241_filesystem_cache_on_write_operations.reference --- .../02241_filesystem_cache_on_write_operations.reference | 6 ------ 1 file changed, 6 deletions(-) diff --git a/tests/queries/0_stateless/02241_filesystem_cache_on_write_operations.reference b/tests/queries/0_stateless/02241_filesystem_cache_on_write_operations.reference index c03b928684b..53566a18edc 100644 --- a/tests/queries/0_stateless/02241_filesystem_cache_on_write_operations.reference +++ b/tests/queries/0_stateless/02241_filesystem_cache_on_write_operations.reference @@ -95,13 +95,7 @@ INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(300, 10000) SELECT count(), sum(size) FROM system.filesystem_cache 24 84045 SYSTEM START MERGES test_02241 -SELECT value FROM system.events WHERE name = 'CachedWriteBufferCacheWriteBytes' -85146 -SELECT value FROM system.events WHERE name = 'CachedReadBufferCacheWriteBytes' OPTIMIZE TABLE test_02241 FINAL -SELECT value FROM system.events WHERE name = 'CachedWriteBufferCacheWriteBytes' -251542 -SELECT value FROM system.events WHERE name = 'CachedReadBufferCacheWriteBytes' SELECT count(), sum(size) FROM system.filesystem_cache 32 167243 ALTER TABLE test_02241 UPDATE value = 'kek' WHERE key = 100 From ffd69e0e127f64cf90a41d7b710c375ced13f092 Mon Sep 17 00:00:00 2001 From: kssenii Date: Tue, 27 Feb 2024 23:22:04 +0800 Subject: [PATCH 10/22] Move setting to merge-tree level --- src/Core/ServerSettings.h | 3 --- src/Interpreters/Context.cpp | 1 - src/Storages/MergeTree/MergeTreeSequentialSource.cpp | 3 ++- src/Storages/MergeTree/MergeTreeSettings.h | 1 + .../config.d/force_read_through_cache_for_merges.xml | 4 +++- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/Core/ServerSettings.h b/src/Core/ServerSettings.h index 0283b98638f..0063b3a2bd6 100644 --- a/src/Core/ServerSettings.h +++ b/src/Core/ServerSettings.h @@ -102,9 +102,6 @@ namespace DB M(UInt64, tables_loader_background_pool_size, 0, "The maximum number of threads that will be used for background async loading of tables. Zero means use all CPUs.", 0) \ M(Bool, async_load_databases, false, "Enable asynchronous loading of databases and tables to speedup server startup. Queries to not yet loaded entity will be blocked until load is finished.", 0) \ M(Bool, display_secrets_in_show_and_select, false, "Allow showing secrets in SHOW and SELECT queries via a format setting and a grant", 0) \ - \ - M(Bool, force_read_through_cache_for_merges, false, "Force read-through filesystem cache for merges", 0) \ - \ M(Seconds, keep_alive_timeout, DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT, "The number of seconds that ClickHouse waits for incoming requests before closing the connection.", 0) \ M(Seconds, replicated_fetches_http_connection_timeout, 0, "HTTP connection timeout for part fetch requests. Inherited from default profile `http_connection_timeout` if not set explicitly.", 0) \ M(Seconds, replicated_fetches_http_send_timeout, 0, "HTTP send timeout for part fetch requests. Inherited from default profile `http_send_timeout` if not set explicitly.", 0) \ diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index a974eaca067..55a4df10206 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -5079,7 +5079,6 @@ ReadSettings Context::getReadSettings() const res.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache; res.enable_filesystem_cache_log = settings.enable_filesystem_cache_log; res.filesystem_cache_segments_batch_size = settings.filesystem_cache_segments_batch_size; - res.force_read_through_cache_merges = getServerSettings().force_read_through_cache_for_merges; res.filesystem_cache_max_download_size = settings.filesystem_cache_max_download_size; res.skip_download_if_exceeds_query_cache = settings.skip_download_if_exceeds_query_cache; diff --git a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp index e375e8b0a9f..6b0c5ccb59a 100644 --- a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp +++ b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp @@ -151,7 +151,8 @@ MergeTreeSequentialSource::MergeTreeSequentialSource( const auto & context = storage.getContext(); ReadSettings read_settings = context->getReadSettings(); - read_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = !read_settings.force_read_through_cache_merges; + read_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = !storage.getSettings()->force_read_through_cache_for_merges; + /// It does not make sense to use pthread_threadpool for background merges/mutations /// And also to preserve backward compatibility read_settings.local_fs_method = LocalFSReadMethod::pread; diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index b64632b6139..9cb74e76dd5 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -191,6 +191,7 @@ struct Settings; M(String, remote_fs_zero_copy_zookeeper_path, "/clickhouse/zero_copy", "ZooKeeper path for zero-copy table-independent info.", 0) \ M(Bool, remote_fs_zero_copy_path_compatible_mode, false, "Run zero-copy in compatible mode during conversion process.", 0) \ M(Bool, cache_populated_by_fetch, false, "Only available in ClickHouse Cloud", 0) \ + M(Bool, force_read_through_cache_for_merges, false, "Force read-through filesystem cache for merges", 0) \ M(Bool, allow_experimental_block_number_column, false, "Enable persisting column _block_number for each row.", 0) \ M(Bool, allow_experimental_replacing_merge_with_cleanup, false, "Allow experimental CLEANUP merges for ReplacingMergeTree with is_deleted column.", 0) \ \ diff --git a/tests/integration/test_filesystem_cache/config.d/force_read_through_cache_for_merges.xml b/tests/integration/test_filesystem_cache/config.d/force_read_through_cache_for_merges.xml index bb2a6e850a4..23d3fdea800 100644 --- a/tests/integration/test_filesystem_cache/config.d/force_read_through_cache_for_merges.xml +++ b/tests/integration/test_filesystem_cache/config.d/force_read_through_cache_for_merges.xml @@ -1,3 +1,5 @@ - 1 + + 1 + From 3188c1ebdac52efbdadb8f64a13b0c4b6f4e1acc Mon Sep 17 00:00:00 2001 From: Kseniia Sumarokova <54203879+kssenii@users.noreply.github.com> Date: Wed, 28 Feb 2024 13:51:48 +0800 Subject: [PATCH 11/22] Update test.py --- tests/integration/test_filesystem_cache/test.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_filesystem_cache/test.py b/tests/integration/test_filesystem_cache/test.py index f32fa4e9823..0cb1866f8e4 100644 --- a/tests/integration/test_filesystem_cache/test.py +++ b/tests/integration/test_filesystem_cache/test.py @@ -94,12 +94,21 @@ def test_parallel_cache_loading_on_startup(cluster, node_name): cache_state = node.query( "SELECT key, file_segment_range_begin, size FROM system.filesystem_cache WHERE size > 0 ORDER BY key, file_segment_range_begin, size" ) + keys = ( + node.query( + "SELECT distinct(key) FROM system.filesystem_cache WHERE size > 0 ORDER BY key, file_segment_range_begin, size" + ) + .strip() + .splitlines() + ) node.restart_clickhouse() - assert cache_count == int(node.query("SELECT count() FROM system.filesystem_cache")) + # < because of additional files loaded into cache on server startup. + assert cache_count <= int(node.query("SELECT count() FROM system.filesystem_cache")) + keys_set = ",".join(["'" + x + "'" for x in keys]) assert cache_state == node.query( - "SELECT key, file_segment_range_begin, size FROM system.filesystem_cache ORDER BY key, file_segment_range_begin, size" + f"SELECT key, file_segment_range_begin, size FROM system.filesystem_cache WHERE key in ({keys_set}) ORDER BY key, file_segment_range_begin, size" ) assert node.contains_in_log("Loading filesystem cache with 30 threads") From 6fbfd42a0522fe4161d367e3d923f2480c1df21a Mon Sep 17 00:00:00 2001 From: Kseniia Sumarokova <54203879+kssenii@users.noreply.github.com> Date: Mon, 4 Mar 2024 16:13:44 +0800 Subject: [PATCH 12/22] Update 02241_filesystem_cache_on_write_operations.reference --- .../02241_filesystem_cache_on_write_operations.reference | 6 ------ 1 file changed, 6 deletions(-) diff --git a/tests/queries/0_stateless/02241_filesystem_cache_on_write_operations.reference b/tests/queries/0_stateless/02241_filesystem_cache_on_write_operations.reference index 53566a18edc..186dcc1eeb2 100644 --- a/tests/queries/0_stateless/02241_filesystem_cache_on_write_operations.reference +++ b/tests/queries/0_stateless/02241_filesystem_cache_on_write_operations.reference @@ -205,13 +205,7 @@ INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(300, 10000) SELECT count(), sum(size) FROM system.filesystem_cache 24 84045 SYSTEM START MERGES test_02241 -SELECT value FROM system.events WHERE name = 'CachedWriteBufferCacheWriteBytes' -81715476 -SELECT value FROM system.events WHERE name = 'CachedReadBufferCacheWriteBytes' OPTIMIZE TABLE test_02241 FINAL -SELECT value FROM system.events WHERE name = 'CachedWriteBufferCacheWriteBytes' -81881872 -SELECT value FROM system.events WHERE name = 'CachedReadBufferCacheWriteBytes' SELECT count(), sum(size) FROM system.filesystem_cache 32 167243 ALTER TABLE test_02241 UPDATE value = 'kek' WHERE key = 100 From b0050566e22d10ca621a33c1b4fedb987ad2620c Mon Sep 17 00:00:00 2001 From: Kseniia Sumarokova <54203879+kssenii@users.noreply.github.com> Date: Tue, 5 Mar 2024 12:14:56 +0800 Subject: [PATCH 13/22] Fix style check --- src/Disks/IO/ReadBufferFromRemoteFSGather.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp index fe8d63b053d..298000ac015 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp @@ -21,7 +21,7 @@ namespace { return settings.remote_fs_cache && settings.enable_filesystem_cache; } - + bool withPageCache(const ReadSettings & settings, bool with_file_cache) { return settings.page_cache && !with_file_cache && settings.use_page_cache_for_disks_without_file_cache; From 2ee846b393d79f3f0d9710ddf910552ba1e040cd Mon Sep 17 00:00:00 2001 From: Kseniia Sumarokova <54203879+kssenii@users.noreply.github.com> Date: Tue, 5 Mar 2024 14:07:56 +0800 Subject: [PATCH 14/22] Fix build --- src/Disks/IO/ReadBufferFromRemoteFSGather.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp index 298000ac015..f72e6634465 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp @@ -17,7 +17,7 @@ using namespace DB; namespace { - bool withCache(const ReadSettings & settings) + bool withFileCache(const ReadSettings & settings) { return settings.remote_fs_cache && settings.enable_filesystem_cache; } From f77b5963748c321975d8bd131e794dcc57002fc8 Mon Sep 17 00:00:00 2001 From: kssenii Date: Thu, 7 Mar 2024 16:17:27 +0800 Subject: [PATCH 15/22] Fix test --- .../integration/test_filesystem_cache/test.py | 47 ++++++++++++------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/tests/integration/test_filesystem_cache/test.py b/tests/integration/test_filesystem_cache/test.py index 0cb1866f8e4..63316aba57e 100644 --- a/tests/integration/test_filesystem_cache/test.py +++ b/tests/integration/test_filesystem_cache/test.py @@ -350,6 +350,20 @@ def test_custom_cached_disk(cluster): def test_force_filesystem_cache_on_merges(cluster): def test(node, forced_read_through_cache_on_merge): + def to_int(value): + if value == "": + return 0 + else: + return int(value) + + r_cache_count = to_int(node.query( + "SELECT value FROM system.events WHERE name = 'CachedReadBufferCacheWriteBytes'" + )) + + w_cache_count = to_int(node.query( + "SELECT value FROM system.events WHERE name = 'CachedWriteBufferCacheWriteBytes'" + )) + node.query( """ DROP TABLE IF EXISTS test SYNC; @@ -376,36 +390,33 @@ def test_force_filesystem_cache_on_merges(cluster): assert int(node.query("SELECT count() FROM system.filesystem_cache")) > 0 assert int(node.query("SELECT max(size) FROM system.filesystem_cache")) == 1024 - write_count = int( + w_cache_count_2 = int( node.query( "SELECT value FROM system.events WHERE name = 'CachedWriteBufferCacheWriteBytes'" ) ) - assert write_count > 100000 - assert "" == node.query( - "SELECT value FROM system.events WHERE name = 'CachedReadBufferCacheWriteBytes'" + assert w_cache_count_2 > w_cache_count + + r_cache_count_2 = to_int( + node.query( + "SELECT value FROM system.events WHERE name = 'CachedReadBufferCacheWriteBytes'" + ) ) + assert r_cache_count_2 == r_cache_count node.query("SYSTEM DROP FILESYSTEM CACHE") node.query("OPTIMIZE TABLE test FINAL") - new_write_count = int( + r_cache_count_3 = to_int( node.query( - "SELECT value FROM system.events WHERE name = 'CachedWriteBufferCacheWriteBytes'" - ) - ) - assert new_write_count >= write_count - - if forced_read_through_cache_on_merge: - assert 100000 < int( - node.query( - "SELECT value FROM system.events WHERE name = 'CachedReadBufferCacheWriteBytes'" - ) - ) - else: - assert "" == node.query( "SELECT value FROM system.events WHERE name = 'CachedReadBufferCacheWriteBytes'" ) + ) + + if forced_read_through_cache_on_merge: + assert r_cache_count_3 > r_cache_count + else: + assert r_cache_count_3 == r_cache_count node = cluster.instances["node_force_read_through_cache_on_merge"] test(node, True) From c7f5b1631c359c61b6e4c74727092df73e956922 Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Thu, 7 Mar 2024 08:30:34 +0000 Subject: [PATCH 16/22] Automatic style fix --- tests/integration/test_filesystem_cache/test.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/tests/integration/test_filesystem_cache/test.py b/tests/integration/test_filesystem_cache/test.py index 63316aba57e..c44d817c57c 100644 --- a/tests/integration/test_filesystem_cache/test.py +++ b/tests/integration/test_filesystem_cache/test.py @@ -356,13 +356,17 @@ def test_force_filesystem_cache_on_merges(cluster): else: return int(value) - r_cache_count = to_int(node.query( - "SELECT value FROM system.events WHERE name = 'CachedReadBufferCacheWriteBytes'" - )) + r_cache_count = to_int( + node.query( + "SELECT value FROM system.events WHERE name = 'CachedReadBufferCacheWriteBytes'" + ) + ) - w_cache_count = to_int(node.query( - "SELECT value FROM system.events WHERE name = 'CachedWriteBufferCacheWriteBytes'" - )) + w_cache_count = to_int( + node.query( + "SELECT value FROM system.events WHERE name = 'CachedWriteBufferCacheWriteBytes'" + ) + ) node.query( """ From 4c3fa4e8642780d2e27ed4f31590345646f0bb26 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Fri, 8 Mar 2024 09:42:49 +0000 Subject: [PATCH 17/22] Use global scalars cache in analyzer. --- src/Analyzer/Passes/QueryAnalysisPass.cpp | 47 ++++++++++++++++++++--- 1 file changed, 42 insertions(+), 5 deletions(-) diff --git a/src/Analyzer/Passes/QueryAnalysisPass.cpp b/src/Analyzer/Passes/QueryAnalysisPass.cpp index c62641ca05c..e7c48b81fbc 100644 --- a/src/Analyzer/Passes/QueryAnalysisPass.cpp +++ b/src/Analyzer/Passes/QueryAnalysisPass.cpp @@ -86,6 +86,7 @@ namespace ProfileEvents { extern const Event ScalarSubqueriesGlobalCacheHit; + extern const Event ScalarSubqueriesLocalCacheHit; extern const Event ScalarSubqueriesCacheMiss; } @@ -1444,7 +1445,8 @@ private: std::unordered_map node_to_tree_size; /// Global scalar subquery to scalar value map - std::unordered_map scalar_subquery_to_scalar_value; + std::unordered_map scalar_subquery_to_scalar_value_local; + std::unordered_map scalar_subquery_to_scalar_value_global; const bool only_analyze; }; @@ -1951,6 +1953,24 @@ QueryTreeNodePtr QueryAnalyzer::tryGetLambdaFromSQLUserDefinedFunctions(const st return result_node; } +bool subtreeHasViewSource(const IQueryTreeNode * node, const Context & context) +{ + if (!node) + return false; + + if (const auto * table_node = node->as()) + { + if (table_node->getStorageID().getFullNameNotQuoted() == context.getViewSource()->getStorageID().getFullNameNotQuoted()) + return true; + } + + for (const auto & child : node->getChildren()) + if (subtreeHasViewSource(child.get(), context)) + return true; + + return false; +} + /// Evaluate scalar subquery and perform constant folding if scalar subquery does not have constant value void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, IdentifierResolveScope & scope) { @@ -1970,13 +1990,28 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, Iden node_without_alias->removeAlias(); QueryTreeNodePtrWithHash node_with_hash(node_without_alias); - auto scalar_value_it = scalar_subquery_to_scalar_value.find(node_with_hash); + auto str_hash = DB::toString(node_with_hash.hash); - if (scalar_value_it != scalar_subquery_to_scalar_value.end()) + bool can_use_global_scalars = !(context->getViewSource() && subtreeHasViewSource(node_without_alias.get(), *context)); + + auto & scalars_cache = can_use_global_scalars ? scalar_subquery_to_scalar_value_global : scalar_subquery_to_scalar_value_local; + auto scalar_value_it = scalars_cache.find(node_with_hash); + + if (scalar_value_it != scalars_cache.end()) { - ProfileEvents::increment(ProfileEvents::ScalarSubqueriesGlobalCacheHit); + if (can_use_global_scalars) + ProfileEvents::increment(ProfileEvents::ScalarSubqueriesGlobalCacheHit); + else + ProfileEvents::increment(ProfileEvents::ScalarSubqueriesLocalCacheHit); + scalar_block = scalar_value_it->second; } + else if (context->hasQueryContext() && can_use_global_scalars && context->getQueryContext()->hasScalar(str_hash)) + { + scalar_block = context->getQueryContext()->getScalar(str_hash); + scalar_subquery_to_scalar_value_global.emplace(node_with_hash, scalar_block); + ProfileEvents::increment(ProfileEvents::ScalarSubqueriesGlobalCacheHit); + } else { ProfileEvents::increment(ProfileEvents::ScalarSubqueriesCacheMiss); @@ -2087,7 +2122,9 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, Iden } } - scalar_subquery_to_scalar_value.emplace(node_with_hash, scalar_block); + scalars_cache.emplace(node_with_hash, scalar_block); + if (can_use_global_scalars && context->hasQueryContext()) + context->getQueryContext()->addScalar(str_hash, scalar_block); } const auto & scalar_column_with_type = scalar_block.safeGetByPosition(0); From cacbd3ce3450e9a2f8915b392ce8ec08f9662282 Mon Sep 17 00:00:00 2001 From: Max Kainov Date: Fri, 8 Mar 2024 14:13:23 +0000 Subject: [PATCH 18/22] CI: support merge queue event in pr_info #do_not_test --- tests/ci/pr_info.py | 56 +++++++++++++++++++++++++++++++-------------- 1 file changed, 39 insertions(+), 17 deletions(-) diff --git a/tests/ci/pr_info.py b/tests/ci/pr_info.py index aba32d88c0a..6f4b400f7a8 100644 --- a/tests/ci/pr_info.py +++ b/tests/ci/pr_info.py @@ -44,11 +44,12 @@ RETRY_SLEEP = 0 class EventType: - UNKNOWN = 0 - PUSH = 1 - PULL_REQUEST = 2 - SCHEDULE = 3 - DISPATCH = 4 + UNKNOWN = "unknown" + PUSH = "commits" + PULL_REQUEST = "pull_request" + SCHEDULE = "schedule" + DISPATCH = "dispatch" + MERGE_QUEUE = "merge_group" def get_pr_for_commit(sha, ref): @@ -114,6 +115,12 @@ class PRInfo: # release_pr and merged_pr are used for docker images additional cache self.release_pr = 0 self.merged_pr = 0 + self.labels = set() + + repo_prefix = f"{GITHUB_SERVER_URL}/{GITHUB_REPOSITORY}" + self.task_url = GITHUB_RUN_URL + self.repo_full_name = GITHUB_REPOSITORY + self.event_type = EventType.UNKNOWN ref = github_event.get("ref", "refs/heads/master") if ref and ref.startswith("refs/heads/"): @@ -154,10 +161,6 @@ class PRInfo: else: self.sha = github_event["pull_request"]["head"]["sha"] - repo_prefix = f"{GITHUB_SERVER_URL}/{GITHUB_REPOSITORY}" - self.task_url = GITHUB_RUN_URL - - self.repo_full_name = GITHUB_REPOSITORY self.commit_html_url = f"{repo_prefix}/commits/{self.sha}" self.pr_html_url = f"{repo_prefix}/pull/{self.number}" @@ -176,7 +179,7 @@ class PRInfo: self.body = github_event["pull_request"]["body"] self.labels = { label["name"] for label in github_event["pull_request"]["labels"] - } # type: Set[str] + } self.user_login = github_event["pull_request"]["user"]["login"] # type: str self.user_orgs = set() # type: Set[str] @@ -191,6 +194,28 @@ class PRInfo: self.diff_urls.append(self.compare_pr_url(github_event["pull_request"])) + elif ( + EventType.MERGE_QUEUE in github_event + ): # pull request and other similar events + self.event_type = EventType.MERGE_QUEUE + # FIXME: need pr? we can parse it from ["head_ref": "refs/heads/gh-readonly-queue/test-merge-queue/pr-6751-4690229995a155e771c52e95fbd446d219c069bf"] + self.number = 0 + self.sha = github_event[EventType.MERGE_QUEUE]["head_sha"] + self.base_ref = github_event[EventType.MERGE_QUEUE]["base_ref"] + base_sha = github_event[EventType.MERGE_QUEUE]["base_sha"] # type: str + # ClickHouse/ClickHouse + self.base_name = github_event["repository"]["full_name"] + # any_branch-name - the name of working branch name + self.head_ref = github_event[EventType.MERGE_QUEUE]["head_ref"] + # UserName/ClickHouse or ClickHouse/ClickHouse + self.head_name = self.base_name + self.user_login = github_event["sender"]["login"] + self.diff_urls.append( + github_event["repository"]["compare_url"] + .replace("{base}", base_sha) + .replace("{head}", self.sha) + ) + elif "commits" in github_event: self.event_type = EventType.PUSH # `head_commit` always comes with `commits` @@ -203,10 +228,8 @@ class PRInfo: logging.error("Failed to convert %s to integer", merged_pr) self.sha = github_event["after"] pull_request = get_pr_for_commit(self.sha, github_event["ref"]) - repo_prefix = f"{GITHUB_SERVER_URL}/{GITHUB_REPOSITORY}" - self.task_url = GITHUB_RUN_URL self.commit_html_url = f"{repo_prefix}/commits/{self.sha}" - self.repo_full_name = GITHUB_REPOSITORY + if pull_request is None or pull_request["state"] == "closed": # it's merged PR to master self.number = 0 @@ -272,11 +295,7 @@ class PRInfo: "GITHUB_SHA", "0000000000000000000000000000000000000000" ) self.number = 0 - self.labels = set() - repo_prefix = f"{GITHUB_SERVER_URL}/{GITHUB_REPOSITORY}" - self.task_url = GITHUB_RUN_URL self.commit_html_url = f"{repo_prefix}/commits/{self.sha}" - self.repo_full_name = GITHUB_REPOSITORY self.pr_html_url = f"{repo_prefix}/commits/{ref}" self.base_ref = ref self.base_name = self.repo_full_name @@ -300,6 +319,9 @@ class PRInfo: def is_scheduled(self): return self.event_type == EventType.SCHEDULE + def is_merge_queue(self): + return self.event_type == EventType.MERGE_QUEUE + def is_dispatched(self): return self.event_type == EventType.DISPATCH From 6f31a77f704b4a0400d433541b67236f6d87a958 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Fri, 8 Mar 2024 15:33:45 +0000 Subject: [PATCH 19/22] Fix test 02174_cte_scalar_cache_mv --- src/Analyzer/Passes/QueryAnalysisPass.cpp | 7 +- .../02174_cte_scalar_cache_mv.reference | 63 ++++++++++++ .../0_stateless/02174_cte_scalar_cache_mv.sql | 99 +++++++++++++++++++ 3 files changed, 165 insertions(+), 4 deletions(-) diff --git a/src/Analyzer/Passes/QueryAnalysisPass.cpp b/src/Analyzer/Passes/QueryAnalysisPass.cpp index e7c48b81fbc..1da5d86edf3 100644 --- a/src/Analyzer/Passes/QueryAnalysisPass.cpp +++ b/src/Analyzer/Passes/QueryAnalysisPass.cpp @@ -1992,19 +1992,18 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, Iden QueryTreeNodePtrWithHash node_with_hash(node_without_alias); auto str_hash = DB::toString(node_with_hash.hash); - bool can_use_global_scalars = !(context->getViewSource() && subtreeHasViewSource(node_without_alias.get(), *context)); + bool can_use_global_scalars = !only_analyze && !(context->getViewSource() && subtreeHasViewSource(node_without_alias.get(), *context)); auto & scalars_cache = can_use_global_scalars ? scalar_subquery_to_scalar_value_global : scalar_subquery_to_scalar_value_local; - auto scalar_value_it = scalars_cache.find(node_with_hash); - if (scalar_value_it != scalars_cache.end()) + if (scalars_cache.contains(node_with_hash)) { if (can_use_global_scalars) ProfileEvents::increment(ProfileEvents::ScalarSubqueriesGlobalCacheHit); else ProfileEvents::increment(ProfileEvents::ScalarSubqueriesLocalCacheHit); - scalar_block = scalar_value_it->second; + scalar_block = scalars_cache.at(node_with_hash); } else if (context->hasQueryContext() && can_use_global_scalars && context->getQueryContext()->hasScalar(str_hash)) { diff --git a/tests/queries/0_stateless/02174_cte_scalar_cache_mv.reference b/tests/queries/0_stateless/02174_cte_scalar_cache_mv.reference index 8ec3608317f..dcfab092b5c 100644 --- a/tests/queries/0_stateless/02174_cte_scalar_cache_mv.reference +++ b/tests/queries/0_stateless/02174_cte_scalar_cache_mv.reference @@ -19,6 +19,48 @@ 94 94 94 94 5 99 99 99 99 5 02177_MV 7 80 22 +4 4 4 4 5 +9 9 9 9 5 +14 14 14 14 5 +19 19 19 19 5 +24 24 24 24 5 +29 29 29 29 5 +34 34 34 34 5 +39 39 39 39 5 +44 44 44 44 5 +49 49 49 49 5 +54 54 54 54 5 +59 59 59 59 5 +64 64 64 64 5 +69 69 69 69 5 +74 74 74 74 5 +79 79 79 79 5 +84 84 84 84 5 +89 89 89 89 5 +94 94 94 94 5 +99 99 99 99 5 +02177_MV 0 0 22 +10 +40 +70 +100 +130 +160 +190 +220 +250 +280 +310 +340 +370 +400 +430 +460 +490 +520 +550 +580 +02177_MV_2 0 0 21 10 40 70 @@ -61,3 +103,24 @@ 188 198 02177_MV_3 20 0 1 +8 +18 +28 +38 +48 +58 +68 +78 +88 +98 +108 +118 +128 +138 +148 +158 +168 +178 +188 +198 +02177_MV_3 19 0 2 diff --git a/tests/queries/0_stateless/02174_cte_scalar_cache_mv.sql b/tests/queries/0_stateless/02174_cte_scalar_cache_mv.sql index 742d72fe2b2..ca54b9e1400 100644 --- a/tests/queries/0_stateless/02174_cte_scalar_cache_mv.sql +++ b/tests/queries/0_stateless/02174_cte_scalar_cache_mv.sql @@ -14,6 +14,8 @@ CREATE MATERIALIZED VIEW mv1 TO t2 AS FROM t1 LIMIT 5; +set allow_experimental_analyzer = 0; + -- FIRST INSERT INSERT INTO t1 WITH @@ -58,8 +60,48 @@ WHERE AND query LIKE '-- FIRST INSERT\nINSERT INTO t1\n%' AND event_date >= yesterday() AND event_time > now() - interval 10 minute; +truncate table t2; +set allow_experimental_analyzer = 1; + +-- FIRST INSERT ANALYZER +INSERT INTO t1 +WITH + (SELECT max(i) FROM t1) AS t1 +SELECT + number as i, + t1 + t1 + t1 AS j -- Using global cache +FROM system.numbers +LIMIT 100 +SETTINGS + min_insert_block_size_rows=5, + max_insert_block_size=5, + min_insert_block_size_rows_for_materialized_views=5, + max_block_size=5, + max_threads=1; + +SELECT k, l, m, n, count() +FROM t2 +GROUP BY k, l, m, n +ORDER BY k, l, m, n; + +SYSTEM FLUSH LOGS; + +SELECT + '02177_MV', + ProfileEvents['ScalarSubqueriesGlobalCacheHit'] as scalar_cache_global_hit, + ProfileEvents['ScalarSubqueriesLocalCacheHit'] as scalar_cache_local_hit, + ProfileEvents['ScalarSubqueriesCacheMiss'] as scalar_cache_miss +FROM system.query_log +WHERE + current_database = currentDatabase() + AND type = 'QueryFinish' + AND query LIKE '-- FIRST INSERT ANALYZER\nINSERT INTO t1\n%' + AND event_date >= yesterday() AND event_time > now() - interval 10 minute; + DROP TABLE mv1; +set allow_experimental_analyzer = 0; + CREATE TABLE t3 (z Int64) ENGINE = Memory; CREATE MATERIALIZED VIEW mv2 TO t3 AS SELECT @@ -91,8 +133,36 @@ WHERE AND query LIKE '-- SECOND INSERT\nINSERT INTO t1%' AND event_date >= yesterday() AND event_time > now() - interval 10 minute; +truncate table t3; +set allow_experimental_analyzer = 1; + +-- SECOND INSERT ANALYZER +INSERT INTO t1 +SELECT 0 as i, number as j from numbers(100) +SETTINGS + min_insert_block_size_rows=5, + max_insert_block_size=5, + min_insert_block_size_rows_for_materialized_views=5, + max_block_size=5, + max_threads=1; + +SELECT * FROM t3 ORDER BY z ASC; +SYSTEM FLUSH LOGS; +SELECT + '02177_MV_2', + ProfileEvents['ScalarSubqueriesGlobalCacheHit'] as scalar_cache_global_hit, + ProfileEvents['ScalarSubqueriesLocalCacheHit'] as scalar_cache_local_hit, + ProfileEvents['ScalarSubqueriesCacheMiss'] as scalar_cache_miss +FROM system.query_log +WHERE + current_database = currentDatabase() + AND type = 'QueryFinish' + AND query LIKE '-- SECOND INSERT ANALYZER\nINSERT INTO t1%' + AND event_date >= yesterday() AND event_time > now() - interval 10 minute; + DROP TABLE mv2; +set allow_experimental_analyzer = 0; CREATE TABLE t4 (z Int64) ENGINE = Memory; CREATE MATERIALIZED VIEW mv3 TO t4 AS @@ -126,6 +196,35 @@ WHERE AND query LIKE '-- THIRD INSERT\nINSERT INTO t1%' AND event_date >= yesterday() AND event_time > now() - interval 10 minute; +truncate table t4; +set allow_experimental_analyzer = 1; + +-- THIRD INSERT ANALYZER +INSERT INTO t1 +SELECT number as i, number as j from numbers(100) + SETTINGS + min_insert_block_size_rows=5, + max_insert_block_size=5, + min_insert_block_size_rows_for_materialized_views=5, + max_block_size=5, + max_threads=1; +SYSTEM FLUSH LOGS; + +SELECT * FROM t4 ORDER BY z ASC; + +SELECT + '02177_MV_3', + ProfileEvents['ScalarSubqueriesGlobalCacheHit'] as scalar_cache_global_hit, + ProfileEvents['ScalarSubqueriesLocalCacheHit'] as scalar_cache_local_hit, + ProfileEvents['ScalarSubqueriesCacheMiss'] as scalar_cache_miss +FROM system.query_log +WHERE + current_database = currentDatabase() + AND type = 'QueryFinish' + AND query LIKE '-- THIRD INSERT ANALYZER\nINSERT INTO t1%' + AND event_date >= yesterday() AND event_time > now() - interval 10 minute; + + DROP TABLE mv3; DROP TABLE t1; DROP TABLE t2; From ceb82cb3bd8579c4ba8accd6e701f4a5cd05d505 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Fri, 8 Mar 2024 15:34:19 +0000 Subject: [PATCH 20/22] Fix test 02174_cte_scalar_cache_mv --- tests/analyzer_tech_debt.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/analyzer_tech_debt.txt b/tests/analyzer_tech_debt.txt index dc6284d20c5..e856da394d2 100644 --- a/tests/analyzer_tech_debt.txt +++ b/tests/analyzer_tech_debt.txt @@ -11,7 +11,6 @@ 01761_cast_to_enum_nullable 01925_join_materialized_columns 01952_optimize_distributed_group_by_sharding_key -02174_cte_scalar_cache_mv 02354_annoy # Check after constants refactoring 02901_parallel_replicas_rollup From f656a015385898602cb651b419b46927f99ab602 Mon Sep 17 00:00:00 2001 From: Max Kainov Date: Mon, 11 Mar 2024 16:39:13 +0000 Subject: [PATCH 21/22] CI: fix sync build issue with reuse #do_not_test --- tests/ci/ci.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/tests/ci/ci.py b/tests/ci/ci.py index 9d57f161be3..cc4d0b11eef 100644 --- a/tests/ci/ci.py +++ b/tests/ci/ci.py @@ -140,7 +140,7 @@ class CiCache: self.s3 = s3 self.job_digests = job_digests self.cache_s3_paths = { - job_type: f"{self._S3_CACHE_PREFIX}/{job_type.value}-{self.job_digests[self._get_reference_job_name(job_type)]}/" + job_type: f"{self._S3_CACHE_PREFIX}/{job_type.value}-{self._get_digest_for_job_type(self.job_digests, job_type)}/" for job_type in self.JobType } self.s3_record_prefixes = { @@ -155,14 +155,23 @@ class CiCache: if not self._LOCAL_CACHE_PATH.exists(): self._LOCAL_CACHE_PATH.mkdir(parents=True, exist_ok=True) - def _get_reference_job_name(self, job_type: JobType) -> str: - res = Build.PACKAGE_RELEASE + def _get_digest_for_job_type( + self, job_digests: Dict[str, str], job_type: JobType + ) -> str: if job_type == self.JobType.DOCS: - res = JobNames.DOCS_CHECK + res = job_digests[JobNames.DOCS_CHECK] elif job_type == self.JobType.SRCS: - res = Build.PACKAGE_RELEASE + # any build type job has the same digest - pick up Build.PACKAGE_RELEASE or Build.PACKAGE_ASAN as a failover + # Build.PACKAGE_RELEASE may not exist in the list if we have reduced CI pipeline + if Build.PACKAGE_RELEASE in job_digests: + res = job_digests[Build.PACKAGE_RELEASE] + elif Build.PACKAGE_ASAN in job_digests: + # failover, if failover does not work - fix it! + res = job_digests[Build.PACKAGE_ASAN] + else: + assert False, "BUG, no build job in digest' list" else: - assert False + assert False, "BUG, New JobType? - please update func" return res def _get_record_file_name( From f973e405eeb4f28a6a937c26d19cad54acd00eb4 Mon Sep 17 00:00:00 2001 From: Max Kainov Date: Wed, 7 Feb 2024 19:36:28 +0000 Subject: [PATCH 22/22] CI: fixing ARM integration tests #do_not_test --- tests/ci/ci.py | 41 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 36 insertions(+), 5 deletions(-) diff --git a/tests/ci/ci.py b/tests/ci/ci.py index 9d57f161be3..898d23be843 100644 --- a/tests/ci/ci.py +++ b/tests/ci/ci.py @@ -1183,13 +1183,13 @@ def _configure_jobs( if batches_to_do: jobs_to_do.append(job) + jobs_params[job] = { + "batches": batches_to_do, + "num_batches": num_batches, + } elif add_to_skip: # treat job as being skipped only if it's controlled by digest jobs_to_skip.append(job) - jobs_params[job] = { - "batches": batches_to_do, - "num_batches": num_batches, - } if not pr_info.is_release_branch(): # randomization bucket filtering (pick one random job from each bucket, for jobs with configured random_bucket property) @@ -1268,6 +1268,33 @@ def _configure_jobs( jobs_to_do = list( set(job for job in jobs_to_do_requested if job not in jobs_to_skip) ) + # if requested job does not have params in jobs_params (it happens for "run_by_label" job) + # we need to add params - otherwise it won't run as "batches" list will be empty + for job in jobs_to_do: + if job not in jobs_params: + num_batches = CI_CONFIG.get_job_config(job).num_batches + jobs_params[job] = { + "batches": list(range(num_batches)), + "num_batches": num_batches, + } + + requested_batches = set() + for token in commit_tokens: + if token.startswith("batch_"): + try: + batches = [ + int(batch) for batch in token.removeprefix("batch_").split("_") + ] + except Exception: + print(f"ERROR: failed to parse commit tag [{token}]") + requested_batches.update(batches) + if requested_batches: + print( + f"NOTE: Only specific job batches were requested [{list(requested_batches)}]" + ) + for job, params in jobs_params.items(): + if params["num_batches"] > 1: + params["batches"] = list(requested_batches) return { "digests": digests, @@ -1372,7 +1399,11 @@ def _update_gh_statuses_action(indata: Dict, s3: S3Helper) -> None: def _fetch_commit_tokens(message: str) -> List[str]: pattern = r"#[\w-]+" matches = [match[1:] for match in re.findall(pattern, message)] - res = [match for match in matches if match in Labels or match.startswith("job_")] + res = [ + match + for match in matches + if match in Labels or match.startswith("job_") or match.startswith("batch_") + ] return res