From 32cfdc98b207ca89c1051fc0141d5a2231f9d5e9 Mon Sep 17 00:00:00 2001 From: Michael Stetsyuk Date: Wed, 4 Sep 2024 18:00:03 +0000 Subject: [PATCH 01/13] fix metadata_version in keeper --- .../MergeTree/ReplicatedMergeTreeQueue.cpp | 6 +++ .../MergeTree/ReplicatedMergeTreeQueue.h | 1 + src/Storages/StorageReplicatedMergeTree.cpp | 50 ++++++++++++++++++- src/Storages/StorageReplicatedMergeTree.h | 1 + 4 files changed, 57 insertions(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 627bda3f8bf..9004f986c5e 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -2128,6 +2128,7 @@ ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() const res.inserts_in_queue = 0; res.merges_in_queue = 0; res.part_mutations_in_queue = 0; + res.metadata_alters_in_queue = 0; res.queue_oldest_time = 0; res.inserts_oldest_time = 0; res.merges_oldest_time = 0; @@ -2170,6 +2171,11 @@ ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() const res.oldest_part_to_mutate_to = entry->new_part_name; } } + + if (entry->type == LogEntry::ALTER_METADATA) + { + ++res.metadata_alters_in_queue; + } } return res; diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 89ef6240558..2011d84eefe 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -453,6 +453,7 @@ public: UInt32 inserts_in_queue; UInt32 merges_in_queue; UInt32 part_mutations_in_queue; + UInt32 metadata_alters_in_queue; UInt32 queue_oldest_time; UInt32 inserts_oldest_time; UInt32 merges_oldest_time; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index ff8e362aa36..7167afa7fd3 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -6155,6 +6155,8 @@ void StorageReplicatedMergeTree::alter( mutation_znode.reset(); auto current_metadata = getInMemoryMetadataPtr(); + // update metadata's metadata_version + // fixReplicaMetadataVersionIfNeeded(current_metadata->metadata_version); StorageInMemoryMetadata future_metadata = *current_metadata; commands.apply(future_metadata, query_context); @@ -6200,7 +6202,8 @@ void StorageReplicatedMergeTree::alter( size_t mutation_path_idx = std::numeric_limits::max(); String new_metadata_str = future_metadata_in_zk.toString(); - ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "metadata", new_metadata_str, current_metadata->getMetadataVersion())); + Int32 metadata_version = fixMetadataVersionInZooKeeper(); + ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "metadata", new_metadata_str, metadata_version)); String new_columns_str = future_metadata.columns.toString(); ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "columns", new_columns_str, -1)); @@ -10641,4 +10644,49 @@ template std::optional StorageReplicatedMergeTree::all const std::vector & zookeeper_block_id_path, const String & zookeeper_path_prefix) const; +Int32 StorageReplicatedMergeTree::tryFixMetadataVersionInZooKeeper() +{ + const Int32 metadata_version = getInMemoryMetadataPtr()->getMetadataVersion(); + if (metadata_version != 0) + { + /// No need to fix anything + return metadata_version; + } + + auto zookeeper = getZooKeeper(); + + Coordination::Stat stat; + zookeeper->get(fs::path(zookeeper_path) / "metadata", &stat); + if (stat.version == 0) + { + /// No need to fix anything + return metadata_version; + } + + queue.pullLogsToQueue(zookeeper); + if (queue.getStatus().metadata_alters_in_queue != 0) + { + LOG_DEBUG(log, "No need to update metadata_version as there are ALTER_METADATA entries in the queue"); + return metadata_version; + } + + const Coordination::Requests ops = { + zkutil::makeSetRequest(fs::path(replica_path) / "metadata_version", std::to_string(stat.version), 0), + zkutil::makeCheckRequest(fs::path(zookeeper_path) / "metadata", stat.version), + }; + Coordination::Responses ops_responses; + const auto code = current_zookeeper->tryMulti(ops, ops_responses); + if (code == Coordination::Error::ZOK) + { + LOG_DEBUG(log, "Successfully fixed metadata_version"); + return stat.version; + } + if (code == Coordination::Error::ZBADVERSION) + { + LOG_DEBUG(log, "No need to update metadata_version because table metadata has been updated on a different replica"); + return metadata_version; + } + throw zkutil::KeeperException(code); +} + } diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 2e54f17d5d5..e591a800ea2 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -1013,6 +1013,7 @@ private: DataPartsVector::const_iterator it; }; + Int32 tryFixMetadataVersionInZooKeeper(); }; String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info); From be55e1d2e166c033aaa369970d3c5b21cfda5807 Mon Sep 17 00:00:00 2001 From: Michael Stetsyuk Date: Thu, 5 Sep 2024 18:38:59 +0000 Subject: [PATCH 02/13] better --- .../ReplicatedMergeTreeAttachThread.cpp | 67 ++++++++++++++++++- .../ReplicatedMergeTreeAttachThread.h | 2 + src/Storages/StorageReplicatedMergeTree.cpp | 50 +------------- src/Storages/StorageReplicatedMergeTree.h | 2 - 4 files changed, 67 insertions(+), 54 deletions(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp index 6e22a3515bc..f28b8f9e9a8 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp @@ -15,6 +15,7 @@ namespace ErrorCodes { extern const int SUPPORT_IS_DISABLED; extern const int REPLICA_STATUS_CHANGED; + extern const int LOGICAL_ERROR; } ReplicatedMergeTreeAttachThread::ReplicatedMergeTreeAttachThread(StorageReplicatedMergeTree & storage_) @@ -117,6 +118,66 @@ void ReplicatedMergeTreeAttachThread::checkHasReplicaMetadataInZooKeeper(const z } } +Int32 ReplicatedMergeTreeAttachThread::fixReplicaMetadataVersionIfNeeded(zkutil::ZooKeeperPtr zookeeper) +{ + const String & zookeeper_path = storage.zookeeper_path; + const String & replica_path = storage.replica_path; + + for (size_t i = 0; i != 2; ++i) + { + String replica_metadata_version_str; + const bool replica_metadata_version_exists = zookeeper->tryGet(replica_path + "/metadata_version", replica_metadata_version_str); + if (!replica_metadata_version_exists) + return -1; + + const Int32 metadata_version = parse(replica_metadata_version_str); + + if (metadata_version != 0) + { + /// No need to fix anything + return metadata_version; + } + + Coordination::Stat stat; + zookeeper->get(fs::path(zookeeper_path) / "metadata", &stat); + if (stat.version == 0) + { + /// No need to fix anything + return metadata_version; + } + + ReplicatedMergeTreeQueue & queue = storage.queue; + queue.pullLogsToQueue(zookeeper); + if (queue.getStatus().metadata_alters_in_queue != 0) + { + LOG_DEBUG(log, "No need to update metadata_version as there are ALTER_METADATA entries in the queue"); + return metadata_version; + } + + const Coordination::Requests ops = { + zkutil::makeSetRequest(fs::path(replica_path) / "metadata_version", std::to_string(stat.version), 0), + zkutil::makeCheckRequest(fs::path(zookeeper_path) / "metadata", stat.version), + }; + Coordination::Responses ops_responses; + const auto code = zookeeper->tryMulti(ops, ops_responses); + if (code == Coordination::Error::ZOK) + { + LOG_DEBUG(log, "Successfully set metadata_version to {}", stat.version); + return stat.version; + } + if (code != Coordination::Error::ZBADVERSION) + { + throw zkutil::KeeperException(code); + } + } + + /// Second attempt is only possible if metadata_version != 0 or metadata.version changed during the first attempt. + /// If metadata_version != 0, on second attempt we will return the new metadata_version. + /// If metadata.version changed, on second attempt we will either get metadata_version != 0 and return the new metadata_version or we will get metadata_alters_in_queue != 0 and return 0. + /// Either way, on second attempt this method should return. + throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to fix replica metadata_version in ZooKeeper after two attempts"); +} + void ReplicatedMergeTreeAttachThread::runImpl() { storage.setZooKeeper(); @@ -160,11 +221,11 @@ void ReplicatedMergeTreeAttachThread::runImpl() /// Just in case it was not removed earlier due to connection loss zookeeper->tryRemove(replica_path + "/flags/force_restore_data"); - String replica_metadata_version; - const bool replica_metadata_version_exists = zookeeper->tryGet(replica_path + "/metadata_version", replica_metadata_version); + const Int32 replica_metadata_version = fixReplicaMetadataVersionIfNeeded(zookeeper); + const bool replica_metadata_version_exists = replica_metadata_version != -1; if (replica_metadata_version_exists) { - storage.setInMemoryMetadata(metadata_snapshot->withMetadataVersion(parse(replica_metadata_version))); + storage.setInMemoryMetadata(metadata_snapshot->withMetadataVersion(replica_metadata_version)); } else { diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.h b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.h index 250a5ed34d1..bfc97442598 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.h @@ -48,6 +48,8 @@ private: void runImpl(); void finalizeInitialization(); + + Int32 fixReplicaMetadataVersionIfNeeded(zkutil::ZooKeeperPtr zookeeper); }; } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 5632c24bca4..865a0cbe506 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -6158,8 +6158,6 @@ void StorageReplicatedMergeTree::alter( mutation_znode.reset(); auto current_metadata = getInMemoryMetadataPtr(); - // update metadata's metadata_version - // fixReplicaMetadataVersionIfNeeded(current_metadata->metadata_version); StorageInMemoryMetadata future_metadata = *current_metadata; commands.apply(future_metadata, query_context); @@ -6205,8 +6203,7 @@ void StorageReplicatedMergeTree::alter( size_t mutation_path_idx = std::numeric_limits::max(); String new_metadata_str = future_metadata_in_zk.toString(); - Int32 metadata_version = fixMetadataVersionInZooKeeper(); - ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "metadata", new_metadata_str, metadata_version)); + ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "metadata", new_metadata_str, current_metadata->getMetadataVersion())); String new_columns_str = future_metadata.columns.toString(); ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "columns", new_columns_str, -1)); @@ -10690,49 +10687,4 @@ template std::optional StorageReplicatedMergeTree::all const std::vector & zookeeper_block_id_path, const String & zookeeper_path_prefix) const; -Int32 StorageReplicatedMergeTree::tryFixMetadataVersionInZooKeeper() -{ - const Int32 metadata_version = getInMemoryMetadataPtr()->getMetadataVersion(); - if (metadata_version != 0) - { - /// No need to fix anything - return metadata_version; - } - - auto zookeeper = getZooKeeper(); - - Coordination::Stat stat; - zookeeper->get(fs::path(zookeeper_path) / "metadata", &stat); - if (stat.version == 0) - { - /// No need to fix anything - return metadata_version; - } - - queue.pullLogsToQueue(zookeeper); - if (queue.getStatus().metadata_alters_in_queue != 0) - { - LOG_DEBUG(log, "No need to update metadata_version as there are ALTER_METADATA entries in the queue"); - return metadata_version; - } - - const Coordination::Requests ops = { - zkutil::makeSetRequest(fs::path(replica_path) / "metadata_version", std::to_string(stat.version), 0), - zkutil::makeCheckRequest(fs::path(zookeeper_path) / "metadata", stat.version), - }; - Coordination::Responses ops_responses; - const auto code = current_zookeeper->tryMulti(ops, ops_responses); - if (code == Coordination::Error::ZOK) - { - LOG_DEBUG(log, "Successfully fixed metadata_version"); - return stat.version; - } - if (code == Coordination::Error::ZBADVERSION) - { - LOG_DEBUG(log, "No need to update metadata_version because table metadata has been updated on a different replica"); - return metadata_version; - } - throw zkutil::KeeperException(code); -} - } diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 15cfa77302b..c10f66031ef 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -1026,8 +1026,6 @@ private: const bool & zero_copy_enabled, const bool & always_use_copy_instead_of_hardlinks, const ContextPtr & query_context); - - Int32 tryFixMetadataVersionInZooKeeper(); }; String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info); From b30aabf635739e2ba67e137ca30ea839dff01192 Mon Sep 17 00:00:00 2001 From: Nikolay Degterinsky <43110995+evillique@users.noreply.github.com> Date: Wed, 11 Sep 2024 22:38:54 +0200 Subject: [PATCH 03/13] Fix attach of ReplicatedMergeTree tables in Replicated databases --- src/Storages/MergeTree/registerStorageMergeTree.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Storages/MergeTree/registerStorageMergeTree.cpp b/src/Storages/MergeTree/registerStorageMergeTree.cpp index 18ed7df9b5d..c7ff266f30f 100644 --- a/src/Storages/MergeTree/registerStorageMergeTree.cpp +++ b/src/Storages/MergeTree/registerStorageMergeTree.cpp @@ -277,7 +277,7 @@ static void extractZooKeeperPathAndReplicaNameFromEngineArgs( if (has_valid_arguments) { - if (is_replicated_database && local_context->getSettingsRef().database_replicated_allow_replicated_engine_arguments == 0) + if (!query.attach && is_replicated_database && local_context->getSettingsRef().database_replicated_allow_replicated_engine_arguments == 0) { throw Exception(ErrorCodes::BAD_ARGUMENTS, "It's not allowed to specify explicit zookeeper_path and replica_name " @@ -285,7 +285,7 @@ static void extractZooKeeperPathAndReplicaNameFromEngineArgs( "specify them explicitly, enable setting " "database_replicated_allow_replicated_engine_arguments."); } - else if (is_replicated_database && local_context->getSettingsRef().database_replicated_allow_replicated_engine_arguments == 1) + else if (!query.attach && is_replicated_database && local_context->getSettingsRef().database_replicated_allow_replicated_engine_arguments == 1) { LOG_WARNING(&Poco::Logger::get("registerStorageMergeTree"), "It's not recommended to explicitly specify " "zookeeper_path and replica_name in ReplicatedMergeTree arguments"); @@ -305,7 +305,7 @@ static void extractZooKeeperPathAndReplicaNameFromEngineArgs( throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica name must be a string literal{}", verbose_help_message); - if (is_replicated_database && local_context->getSettingsRef().database_replicated_allow_replicated_engine_arguments == 2) + if (!query.attach && is_replicated_database && local_context->getSettingsRef().database_replicated_allow_replicated_engine_arguments == 2) { LOG_WARNING(&Poco::Logger::get("registerStorageMergeTree"), "Replacing user-provided ZooKeeper path and replica name ({}, {}) " "with default arguments", zookeeper_path, replica_name); From 8e3ba4bd6c3c859d51700b80f8fe6506418b3ec8 Mon Sep 17 00:00:00 2001 From: Nikolay Degterinsky <43110995+evillique@users.noreply.github.com> Date: Wed, 11 Sep 2024 22:59:06 +0200 Subject: [PATCH 04/13] Add test --- .../test_replicated_database/test.py | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/tests/integration/test_replicated_database/test.py b/tests/integration/test_replicated_database/test.py index 60a6e099b22..eec9a5a56e6 100644 --- a/tests/integration/test_replicated_database/test.py +++ b/tests/integration/test_replicated_database/test.py @@ -1549,3 +1549,25 @@ def test_all_groups_cluster(started_cluster): assert "bad_settings_node\ndummy_node\n" == bad_settings_node.query( "select host_name from system.clusters where name='all_groups.db_cluster' order by host_name" ) + + +def test_detach_attach_table(started_cluster): + main_node.query("DROP DATABASE IF EXISTS detach_attach_db SYNC") + main_node.query( + "CREATE DATABASE detach_attach_db ENGINE = Replicated('/clickhouse/databases/detach_attach_db');" + ) + main_node.query( + "CREATE TABLE detach_attach_db.detach_attach_table (k UInt64) ENGINE=ReplicatedMergeTree ORDER BY k;" + ) + main_node.query( + "INSERT INTO detach_attach_db.detach_attach_table VALUES (1);" + ) + main_node.query( + "DETACH TABLE detach_attach_db.detach_attach_table PERMANENTLY;" + ) + main_node.query( + "ATTACH TABLE detach_attach_db.detach_attach_table;" + ) + assert ( + main_node.query("SELECT * FROM detach_attach_db.detach_attach_table;") == "1\n" + ) From 1da5729f89a2476e7f9cf9e6d31d010737379926 Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Wed, 11 Sep 2024 21:08:33 +0000 Subject: [PATCH 05/13] Automatic style fix --- tests/integration/test_replicated_database/test.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/tests/integration/test_replicated_database/test.py b/tests/integration/test_replicated_database/test.py index eec9a5a56e6..533eb601ad6 100644 --- a/tests/integration/test_replicated_database/test.py +++ b/tests/integration/test_replicated_database/test.py @@ -1559,15 +1559,9 @@ def test_detach_attach_table(started_cluster): main_node.query( "CREATE TABLE detach_attach_db.detach_attach_table (k UInt64) ENGINE=ReplicatedMergeTree ORDER BY k;" ) - main_node.query( - "INSERT INTO detach_attach_db.detach_attach_table VALUES (1);" - ) - main_node.query( - "DETACH TABLE detach_attach_db.detach_attach_table PERMANENTLY;" - ) - main_node.query( - "ATTACH TABLE detach_attach_db.detach_attach_table;" - ) + main_node.query("INSERT INTO detach_attach_db.detach_attach_table VALUES (1);") + main_node.query("DETACH TABLE detach_attach_db.detach_attach_table PERMANENTLY;") + main_node.query("ATTACH TABLE detach_attach_db.detach_attach_table;") assert ( main_node.query("SELECT * FROM detach_attach_db.detach_attach_table;") == "1\n" ) From 08fd6c8ab6df952775a978a3aebcfcd3bf4ecbf3 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Tue, 10 Sep 2024 10:30:34 +0800 Subject: [PATCH 06/13] have_compressed is lost in reuseJoinedData --- src/Interpreters/HashJoin/HashJoin.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Interpreters/HashJoin/HashJoin.cpp b/src/Interpreters/HashJoin/HashJoin.cpp index 1b8b45b94ea..230e4cd9691 100644 --- a/src/Interpreters/HashJoin/HashJoin.cpp +++ b/src/Interpreters/HashJoin/HashJoin.cpp @@ -1236,6 +1236,7 @@ IBlocksStreamPtr HashJoin::getNonJoinedBlocks(const Block & left_sample_block, void HashJoin::reuseJoinedData(const HashJoin & join) { + have_compressed = join.have_compressed; data = join.data; from_storage_join = true; From 51f32450300d79736bf86513b2118e9f1398c0cc Mon Sep 17 00:00:00 2001 From: Robert Schulze Date: Fri, 13 Sep 2024 11:43:59 +0000 Subject: [PATCH 07/13] A better fix for #67476 Throw an exception instead of silently ignoring two conflicting settings. --- src/Common/ErrorCodes.cpp | 1 + src/Interpreters/executeQuery.cpp | 29 ++++++++++-------- .../02494_query_cache_bugs.reference | 20 ------------- .../0_stateless/02494_query_cache_bugs.sql | 30 +++++++------------ 4 files changed, 27 insertions(+), 53 deletions(-) diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 1055b3d34db..09a5375191b 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -609,6 +609,7 @@ M(728, UNEXPECTED_DATA_TYPE) \ M(729, ILLEGAL_TIME_SERIES_TAGS) \ M(730, REFRESH_FAILED) \ + M(731, QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE) \ \ M(900, DISTRIBUTED_CACHE_ERROR) \ M(901, CANNOT_USE_DISTRIBUTED_CACHE) \ diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index be9423852c1..3260ea890c6 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -99,6 +99,7 @@ namespace DB namespace ErrorCodes { extern const int QUERY_CACHE_USED_WITH_NONDETERMINISTIC_FUNCTIONS; + extern const int QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE; extern const int QUERY_CACHE_USED_WITH_SYSTEM_TABLE; extern const int INTO_OUTFILE_NOT_ALLOWED; extern const int INVALID_TRANSACTION; @@ -1118,22 +1119,24 @@ static std::tuple executeQueryImpl( && settings.use_query_cache && !internal && client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY - /// Bug 67476: Avoid that the query cache stores truncated results if the query ran with a non-THROW overflow mode and hit a limit. - /// This is more workaround than a fix ... unfortunately it is hard to detect from the perspective of the query cache that the - /// query result is truncated. - && (settings.read_overflow_mode == OverflowMode::THROW - && settings.read_overflow_mode_leaf == OverflowMode::THROW - && settings.group_by_overflow_mode == OverflowMode::THROW - && settings.sort_overflow_mode == OverflowMode::THROW - && settings.result_overflow_mode == OverflowMode::THROW - && settings.timeout_overflow_mode == OverflowMode::THROW - && settings.set_overflow_mode == OverflowMode::THROW - && settings.join_overflow_mode == OverflowMode::THROW - && settings.transfer_overflow_mode == OverflowMode::THROW - && settings.distinct_overflow_mode == OverflowMode::THROW) && (ast->as() || ast->as()); QueryCache::Usage query_cache_usage = QueryCache::Usage::None; + /// Bug 67476: If the query runs with a non-THROW overflow mode and hits a limit, the query cache will store a truncated result (if + /// enabled). This is incorrect. Unfortunately it is hard to detect from the perspective of the query cache that the query result + /// is truncated. Therefore throw an exception, to notify the user to disable either the query cache or use another overflow mode. + if (settings.use_query_cache && (settings.read_overflow_mode != OverflowMode::THROW + || settings.read_overflow_mode_leaf != OverflowMode::THROW + || settings.group_by_overflow_mode != OverflowMode::THROW + || settings.sort_overflow_mode != OverflowMode::THROW + || settings.result_overflow_mode != OverflowMode::THROW + || settings.timeout_overflow_mode != OverflowMode::THROW + || settings.set_overflow_mode != OverflowMode::THROW + || settings.join_overflow_mode != OverflowMode::THROW + || settings.transfer_overflow_mode != OverflowMode::THROW + || settings.distinct_overflow_mode != OverflowMode::THROW)) + throw Exception(ErrorCodes::QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE, "use_query_cache and overflow_mode != 'throw' cannot be used together"); + /// If the query runs with "use_query_cache = 1", we first probe if the query cache already contains the query result (if yes: /// return result from cache). If doesn't, we execute the query normally and write the result into the query cache. Both steps use a /// hash of the AST, the current database and the settings as cache key. Unfortunately, the settings are in some places internally diff --git a/tests/queries/0_stateless/02494_query_cache_bugs.reference b/tests/queries/0_stateless/02494_query_cache_bugs.reference index ea9017d5394..d50e9c42204 100644 --- a/tests/queries/0_stateless/02494_query_cache_bugs.reference +++ b/tests/queries/0_stateless/02494_query_cache_bugs.reference @@ -23,23 +23,3 @@ Row 1: x: 1 2 -- Bug 67476: Queries with overflow mode != throw must not be cached by the query cache -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 diff --git a/tests/queries/0_stateless/02494_query_cache_bugs.sql b/tests/queries/0_stateless/02494_query_cache_bugs.sql index 423068aa646..755a5fae924 100644 --- a/tests/queries/0_stateless/02494_query_cache_bugs.sql +++ b/tests/queries/0_stateless/02494_query_cache_bugs.sql @@ -43,25 +43,15 @@ DROP TABLE IF EXISTS tab; CREATE TABLE tab(c UInt64) ENGINE = Memory; SYSTEM DROP QUERY CACHE; -SELECT sum(c) FROM tab SETTINGS read_overflow_mode = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; -SELECT sum(c) FROM tab SETTINGS read_overflow_mode_leaf = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; -SELECT sum(c) FROM tab SETTINGS group_by_overflow_mode = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; -SELECT sum(c) FROM tab SETTINGS sort_overflow_mode = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; -SELECT sum(c) FROM tab SETTINGS result_overflow_mode = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; -SELECT sum(c) FROM tab SETTINGS timeout_overflow_mode = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; -SELECT sum(c) FROM tab SETTINGS set_overflow_mode = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; -SELECT sum(c) FROM tab SETTINGS join_overflow_mode = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; -SELECT sum(c) FROM tab SETTINGS transfer_overflow_mode = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; -SELECT sum(c) FROM tab SETTINGS distinct_overflow_mode = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; +SELECT sum(c) FROM tab SETTINGS read_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } +SELECT sum(c) FROM tab SETTINGS read_overflow_mode_leaf = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } +SELECT sum(c) FROM tab SETTINGS group_by_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } +SELECT sum(c) FROM tab SETTINGS sort_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } +SELECT sum(c) FROM tab SETTINGS result_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } +SELECT sum(c) FROM tab SETTINGS timeout_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } +SELECT sum(c) FROM tab SETTINGS set_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } +SELECT sum(c) FROM tab SETTINGS join_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } +SELECT sum(c) FROM tab SETTINGS transfer_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } +SELECT sum(c) FROM tab SETTINGS distinct_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } SYSTEM DROP QUERY CACHE; From 1b1db0081f66068d1bccf3d7963cb872369468f6 Mon Sep 17 00:00:00 2001 From: Michael Stetsyuk Date: Tue, 10 Sep 2024 22:39:35 +0000 Subject: [PATCH 08/13] do not fix metadata_version if replica is read_only --- src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp index f28b8f9e9a8..67570d78366 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp @@ -122,6 +122,7 @@ Int32 ReplicatedMergeTreeAttachThread::fixReplicaMetadataVersionIfNeeded(zkutil: { const String & zookeeper_path = storage.zookeeper_path; const String & replica_path = storage.replica_path; + const bool replica_readonly = storage.is_readonly; for (size_t i = 0; i != 2; ++i) { @@ -132,7 +133,7 @@ Int32 ReplicatedMergeTreeAttachThread::fixReplicaMetadataVersionIfNeeded(zkutil: const Int32 metadata_version = parse(replica_metadata_version_str); - if (metadata_version != 0) + if (metadata_version != 0 || replica_readonly) { /// No need to fix anything return metadata_version; From 721e9a735672fad9f70150434c8ace7a9358fae3 Mon Sep 17 00:00:00 2001 From: Michael Stetsyuk Date: Fri, 13 Sep 2024 12:38:58 +0000 Subject: [PATCH 09/13] empty From 42670a46d49719efc5caae1ca23b6d95360fd02d Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Fri, 13 Sep 2024 17:00:46 +0100 Subject: [PATCH 10/13] impl --- .../QueryPlan/ReadFromMergeTree.cpp | 27 ------- .../__init__.py | 0 .../configs/remote_servers.xml | 33 +++++++++ .../test.py | 74 +++++++++++++++++++ 4 files changed, 107 insertions(+), 27 deletions(-) create mode 100644 tests/integration/test_parallel_replicas_snapshot_from_initiator/__init__.py create mode 100644 tests/integration/test_parallel_replicas_snapshot_from_initiator/configs/remote_servers.xml create mode 100644 tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 218f0a61a48..0b96cc57274 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -2009,33 +2009,6 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons { auto result = getAnalysisResult(); - if (is_parallel_reading_from_replicas && context->canUseParallelReplicasOnInitiator() - && context->getSettingsRef().parallel_replicas_local_plan) - { - CoordinationMode mode = CoordinationMode::Default; - switch (result.read_type) - { - case ReadFromMergeTree::ReadType::Default: - mode = CoordinationMode::Default; - break; - case ReadFromMergeTree::ReadType::InOrder: - mode = CoordinationMode::WithOrder; - break; - case ReadFromMergeTree::ReadType::InReverseOrder: - mode = CoordinationMode::ReverseOrder; - break; - case ReadFromMergeTree::ReadType::ParallelReplicas: - throw Exception(ErrorCodes::LOGICAL_ERROR, "Read type can't be ParallelReplicas on initiator"); - } - - chassert(number_of_current_replica.has_value()); - chassert(all_ranges_callback.has_value()); - - /// initialize working set from local replica - all_ranges_callback.value()( - InitialAllRangesAnnouncement(mode, result.parts_with_ranges.getDescriptions(), number_of_current_replica.value())); - } - if (enable_remove_parts_from_snapshot_optimization) { /// Do not keep data parts in snapshot. diff --git a/tests/integration/test_parallel_replicas_snapshot_from_initiator/__init__.py b/tests/integration/test_parallel_replicas_snapshot_from_initiator/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_parallel_replicas_snapshot_from_initiator/configs/remote_servers.xml b/tests/integration/test_parallel_replicas_snapshot_from_initiator/configs/remote_servers.xml new file mode 100644 index 00000000000..734acf5f363 --- /dev/null +++ b/tests/integration/test_parallel_replicas_snapshot_from_initiator/configs/remote_servers.xml @@ -0,0 +1,33 @@ + + + + + false + + node0 + 9000 + + + node1 + 9000 + + + node2 + 9000 + + + node3 + 9000 + + + node4 + 9000 + + + node5 + 9000 + + + + + diff --git a/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py b/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py new file mode 100644 index 00000000000..2b3b94f95dc --- /dev/null +++ b/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py @@ -0,0 +1,74 @@ +import pytest + +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) + +nodes = [ + cluster.add_instance( + f"node{num}", main_configs=["configs/remote_servers.xml"], with_zookeeper=True + ) + for num in range(6) +] + + +@pytest.fixture(scope="module", autouse=True) +def start_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def _create_tables(table_name): + for idx, node in enumerate(nodes): + node.query( + f"DROP TABLE IF EXISTS {table_name}", + settings={"database_atomic_wait_for_drop_and_detach_synchronously": True}, + ) + + node.query( + f""" + CREATE TABLE {table_name} (value Int64) + Engine=ReplicatedMergeTree('/test_parallel_replicas/shard/{table_name}', '{idx}') + ORDER BY () + """ + ) + + nodes[0].query( + f"INSERT INTO {table_name} SELECT * FROM numbers(1000)", + settings={"insert_deduplicate": 0}, + ) + nodes[0].query(f"SYSTEM SYNC REPLICA ON CLUSTER 'parallel_replicas' {table_name}") + + for idx, node in enumerate(nodes): + node.query("SYSTEM STOP REPLICATED SENDS") + # the same data on all nodes except for a single value + node.query( + f"INSERT INTO {table_name} VALUES ({idx})", + settings={"insert_deduplicate": 0}, + ) + + +def test_number_of_marks_read(start_cluster): + if nodes[0].is_built_with_sanitizer(): + pytest.skip("Disabled for sanitizers (too slow)") + + table_name = "t" + _create_tables(table_name) + + for idx, node in enumerate(nodes): + expected = 499500 + idx # sum of all integers 0..999 + idx + assert ( + node.query( + "SELECT sum(value) FROM t", + settings={ + "allow_experimental_parallel_reading_from_replicas": 2, + "max_parallel_replicas": 100, + "cluster_for_parallel_replicas": "parallel_replicas", + "parallel_replicas_local_plan": True, + }, + ) + == f"{expected}\n" + ) From de78992966ac73825dd597e0f0d168b83f71360d Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Fri, 13 Sep 2024 18:25:51 +0200 Subject: [PATCH 11/13] Update tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py Co-authored-by: Igor Nikonov <954088+devcrafter@users.noreply.github.com> --- .../test_parallel_replicas_snapshot_from_initiator/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py b/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py index 2b3b94f95dc..5da0a57ee7e 100644 --- a/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py +++ b/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py @@ -62,7 +62,7 @@ def test_number_of_marks_read(start_cluster): expected = 499500 + idx # sum of all integers 0..999 + idx assert ( node.query( - "SELECT sum(value) FROM t", + "SELECT sum(value) FROM {table_name}", settings={ "allow_experimental_parallel_reading_from_replicas": 2, "max_parallel_replicas": 100, From 57a6a64d8c622fc42b2e3cb16a0f9f23783bab46 Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Fri, 13 Sep 2024 17:31:28 +0100 Subject: [PATCH 12/13] fix --- .../test_parallel_replicas_snapshot_from_initiator/test.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py b/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py index 5da0a57ee7e..ec962c7cb32 100644 --- a/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py +++ b/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py @@ -51,10 +51,9 @@ def _create_tables(table_name): ) -def test_number_of_marks_read(start_cluster): - if nodes[0].is_built_with_sanitizer(): - pytest.skip("Disabled for sanitizers (too slow)") - +# check that we use the state of data parts from the initiator node (for some sort of determinism of what is been read). +# currently it is implemented only when we build local plan for the initiator node (we aim to make this behavior default) +def test_initiator_snapshot_is_used_for_reading(start_cluster): table_name = "t" _create_tables(table_name) From 8fd9345d2d7fae6711b4733f658d330df2edffc2 Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Fri, 13 Sep 2024 21:26:58 +0100 Subject: [PATCH 13/13] fix --- .../test_parallel_replicas_snapshot_from_initiator/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py b/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py index ec962c7cb32..a7e7e99455b 100644 --- a/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py +++ b/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py @@ -61,7 +61,7 @@ def test_initiator_snapshot_is_used_for_reading(start_cluster): expected = 499500 + idx # sum of all integers 0..999 + idx assert ( node.query( - "SELECT sum(value) FROM {table_name}", + f"SELECT sum(value) FROM {table_name}", settings={ "allow_experimental_parallel_reading_from_replicas": 2, "max_parallel_replicas": 100,