diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 1055b3d34db..09a5375191b 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -609,6 +609,7 @@ M(728, UNEXPECTED_DATA_TYPE) \ M(729, ILLEGAL_TIME_SERIES_TAGS) \ M(730, REFRESH_FAILED) \ + M(731, QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE) \ \ M(900, DISTRIBUTED_CACHE_ERROR) \ M(901, CANNOT_USE_DISTRIBUTED_CACHE) \ diff --git a/src/Interpreters/HashJoin/HashJoin.cpp b/src/Interpreters/HashJoin/HashJoin.cpp index 1b8b45b94ea..230e4cd9691 100644 --- a/src/Interpreters/HashJoin/HashJoin.cpp +++ b/src/Interpreters/HashJoin/HashJoin.cpp @@ -1236,6 +1236,7 @@ IBlocksStreamPtr HashJoin::getNonJoinedBlocks(const Block & left_sample_block, void HashJoin::reuseJoinedData(const HashJoin & join) { + have_compressed = join.have_compressed; data = join.data; from_storage_join = true; diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index be9423852c1..3260ea890c6 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -99,6 +99,7 @@ namespace DB namespace ErrorCodes { extern const int QUERY_CACHE_USED_WITH_NONDETERMINISTIC_FUNCTIONS; + extern const int QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE; extern const int QUERY_CACHE_USED_WITH_SYSTEM_TABLE; extern const int INTO_OUTFILE_NOT_ALLOWED; extern const int INVALID_TRANSACTION; @@ -1118,22 +1119,24 @@ static std::tuple executeQueryImpl( && settings.use_query_cache && !internal && client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY - /// Bug 67476: Avoid that the query cache stores truncated results if the query ran with a non-THROW overflow mode and hit a limit. - /// This is more workaround than a fix ... unfortunately it is hard to detect from the perspective of the query cache that the - /// query result is truncated. - && (settings.read_overflow_mode == OverflowMode::THROW - && settings.read_overflow_mode_leaf == OverflowMode::THROW - && settings.group_by_overflow_mode == OverflowMode::THROW - && settings.sort_overflow_mode == OverflowMode::THROW - && settings.result_overflow_mode == OverflowMode::THROW - && settings.timeout_overflow_mode == OverflowMode::THROW - && settings.set_overflow_mode == OverflowMode::THROW - && settings.join_overflow_mode == OverflowMode::THROW - && settings.transfer_overflow_mode == OverflowMode::THROW - && settings.distinct_overflow_mode == OverflowMode::THROW) && (ast->as() || ast->as()); QueryCache::Usage query_cache_usage = QueryCache::Usage::None; + /// Bug 67476: If the query runs with a non-THROW overflow mode and hits a limit, the query cache will store a truncated result (if + /// enabled). This is incorrect. Unfortunately it is hard to detect from the perspective of the query cache that the query result + /// is truncated. Therefore throw an exception, to notify the user to disable either the query cache or use another overflow mode. + if (settings.use_query_cache && (settings.read_overflow_mode != OverflowMode::THROW + || settings.read_overflow_mode_leaf != OverflowMode::THROW + || settings.group_by_overflow_mode != OverflowMode::THROW + || settings.sort_overflow_mode != OverflowMode::THROW + || settings.result_overflow_mode != OverflowMode::THROW + || settings.timeout_overflow_mode != OverflowMode::THROW + || settings.set_overflow_mode != OverflowMode::THROW + || settings.join_overflow_mode != OverflowMode::THROW + || settings.transfer_overflow_mode != OverflowMode::THROW + || settings.distinct_overflow_mode != OverflowMode::THROW)) + throw Exception(ErrorCodes::QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE, "use_query_cache and overflow_mode != 'throw' cannot be used together"); + /// If the query runs with "use_query_cache = 1", we first probe if the query cache already contains the query result (if yes: /// return result from cache). If doesn't, we execute the query normally and write the result into the query cache. Both steps use a /// hash of the AST, the current database and the settings as cache key. Unfortunately, the settings are in some places internally diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 218f0a61a48..0b96cc57274 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -2009,33 +2009,6 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons { auto result = getAnalysisResult(); - if (is_parallel_reading_from_replicas && context->canUseParallelReplicasOnInitiator() - && context->getSettingsRef().parallel_replicas_local_plan) - { - CoordinationMode mode = CoordinationMode::Default; - switch (result.read_type) - { - case ReadFromMergeTree::ReadType::Default: - mode = CoordinationMode::Default; - break; - case ReadFromMergeTree::ReadType::InOrder: - mode = CoordinationMode::WithOrder; - break; - case ReadFromMergeTree::ReadType::InReverseOrder: - mode = CoordinationMode::ReverseOrder; - break; - case ReadFromMergeTree::ReadType::ParallelReplicas: - throw Exception(ErrorCodes::LOGICAL_ERROR, "Read type can't be ParallelReplicas on initiator"); - } - - chassert(number_of_current_replica.has_value()); - chassert(all_ranges_callback.has_value()); - - /// initialize working set from local replica - all_ranges_callback.value()( - InitialAllRangesAnnouncement(mode, result.parts_with_ranges.getDescriptions(), number_of_current_replica.value())); - } - if (enable_remove_parts_from_snapshot_optimization) { /// Do not keep data parts in snapshot. diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp index 6e22a3515bc..67570d78366 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp @@ -15,6 +15,7 @@ namespace ErrorCodes { extern const int SUPPORT_IS_DISABLED; extern const int REPLICA_STATUS_CHANGED; + extern const int LOGICAL_ERROR; } ReplicatedMergeTreeAttachThread::ReplicatedMergeTreeAttachThread(StorageReplicatedMergeTree & storage_) @@ -117,6 +118,67 @@ void ReplicatedMergeTreeAttachThread::checkHasReplicaMetadataInZooKeeper(const z } } +Int32 ReplicatedMergeTreeAttachThread::fixReplicaMetadataVersionIfNeeded(zkutil::ZooKeeperPtr zookeeper) +{ + const String & zookeeper_path = storage.zookeeper_path; + const String & replica_path = storage.replica_path; + const bool replica_readonly = storage.is_readonly; + + for (size_t i = 0; i != 2; ++i) + { + String replica_metadata_version_str; + const bool replica_metadata_version_exists = zookeeper->tryGet(replica_path + "/metadata_version", replica_metadata_version_str); + if (!replica_metadata_version_exists) + return -1; + + const Int32 metadata_version = parse(replica_metadata_version_str); + + if (metadata_version != 0 || replica_readonly) + { + /// No need to fix anything + return metadata_version; + } + + Coordination::Stat stat; + zookeeper->get(fs::path(zookeeper_path) / "metadata", &stat); + if (stat.version == 0) + { + /// No need to fix anything + return metadata_version; + } + + ReplicatedMergeTreeQueue & queue = storage.queue; + queue.pullLogsToQueue(zookeeper); + if (queue.getStatus().metadata_alters_in_queue != 0) + { + LOG_DEBUG(log, "No need to update metadata_version as there are ALTER_METADATA entries in the queue"); + return metadata_version; + } + + const Coordination::Requests ops = { + zkutil::makeSetRequest(fs::path(replica_path) / "metadata_version", std::to_string(stat.version), 0), + zkutil::makeCheckRequest(fs::path(zookeeper_path) / "metadata", stat.version), + }; + Coordination::Responses ops_responses; + const auto code = zookeeper->tryMulti(ops, ops_responses); + if (code == Coordination::Error::ZOK) + { + LOG_DEBUG(log, "Successfully set metadata_version to {}", stat.version); + return stat.version; + } + if (code != Coordination::Error::ZBADVERSION) + { + throw zkutil::KeeperException(code); + } + } + + /// Second attempt is only possible if metadata_version != 0 or metadata.version changed during the first attempt. + /// If metadata_version != 0, on second attempt we will return the new metadata_version. + /// If metadata.version changed, on second attempt we will either get metadata_version != 0 and return the new metadata_version or we will get metadata_alters_in_queue != 0 and return 0. + /// Either way, on second attempt this method should return. + throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to fix replica metadata_version in ZooKeeper after two attempts"); +} + void ReplicatedMergeTreeAttachThread::runImpl() { storage.setZooKeeper(); @@ -160,11 +222,11 @@ void ReplicatedMergeTreeAttachThread::runImpl() /// Just in case it was not removed earlier due to connection loss zookeeper->tryRemove(replica_path + "/flags/force_restore_data"); - String replica_metadata_version; - const bool replica_metadata_version_exists = zookeeper->tryGet(replica_path + "/metadata_version", replica_metadata_version); + const Int32 replica_metadata_version = fixReplicaMetadataVersionIfNeeded(zookeeper); + const bool replica_metadata_version_exists = replica_metadata_version != -1; if (replica_metadata_version_exists) { - storage.setInMemoryMetadata(metadata_snapshot->withMetadataVersion(parse(replica_metadata_version))); + storage.setInMemoryMetadata(metadata_snapshot->withMetadataVersion(replica_metadata_version)); } else { diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.h b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.h index 250a5ed34d1..bfc97442598 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.h @@ -48,6 +48,8 @@ private: void runImpl(); void finalizeInitialization(); + + Int32 fixReplicaMetadataVersionIfNeeded(zkutil::ZooKeeperPtr zookeeper); }; } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 7d8018e7577..0fa2be6a389 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -2222,6 +2222,7 @@ ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() const res.inserts_in_queue = 0; res.merges_in_queue = 0; res.part_mutations_in_queue = 0; + res.metadata_alters_in_queue = 0; res.queue_oldest_time = 0; res.inserts_oldest_time = 0; res.merges_oldest_time = 0; @@ -2264,6 +2265,11 @@ ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() const res.oldest_part_to_mutate_to = entry->new_part_name; } } + + if (entry->type == LogEntry::ALTER_METADATA) + { + ++res.metadata_alters_in_queue; + } } return res; diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 91a23b6a3b6..9d3349663e2 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -473,6 +473,7 @@ public: UInt32 inserts_in_queue; UInt32 merges_in_queue; UInt32 part_mutations_in_queue; + UInt32 metadata_alters_in_queue; UInt32 queue_oldest_time; UInt32 inserts_oldest_time; UInt32 merges_oldest_time; diff --git a/src/Storages/MergeTree/registerStorageMergeTree.cpp b/src/Storages/MergeTree/registerStorageMergeTree.cpp index 18ed7df9b5d..c7ff266f30f 100644 --- a/src/Storages/MergeTree/registerStorageMergeTree.cpp +++ b/src/Storages/MergeTree/registerStorageMergeTree.cpp @@ -277,7 +277,7 @@ static void extractZooKeeperPathAndReplicaNameFromEngineArgs( if (has_valid_arguments) { - if (is_replicated_database && local_context->getSettingsRef().database_replicated_allow_replicated_engine_arguments == 0) + if (!query.attach && is_replicated_database && local_context->getSettingsRef().database_replicated_allow_replicated_engine_arguments == 0) { throw Exception(ErrorCodes::BAD_ARGUMENTS, "It's not allowed to specify explicit zookeeper_path and replica_name " @@ -285,7 +285,7 @@ static void extractZooKeeperPathAndReplicaNameFromEngineArgs( "specify them explicitly, enable setting " "database_replicated_allow_replicated_engine_arguments."); } - else if (is_replicated_database && local_context->getSettingsRef().database_replicated_allow_replicated_engine_arguments == 1) + else if (!query.attach && is_replicated_database && local_context->getSettingsRef().database_replicated_allow_replicated_engine_arguments == 1) { LOG_WARNING(&Poco::Logger::get("registerStorageMergeTree"), "It's not recommended to explicitly specify " "zookeeper_path and replica_name in ReplicatedMergeTree arguments"); @@ -305,7 +305,7 @@ static void extractZooKeeperPathAndReplicaNameFromEngineArgs( throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica name must be a string literal{}", verbose_help_message); - if (is_replicated_database && local_context->getSettingsRef().database_replicated_allow_replicated_engine_arguments == 2) + if (!query.attach && is_replicated_database && local_context->getSettingsRef().database_replicated_allow_replicated_engine_arguments == 2) { LOG_WARNING(&Poco::Logger::get("registerStorageMergeTree"), "Replacing user-provided ZooKeeper path and replica name ({}, {}) " "with default arguments", zookeeper_path, replica_name); diff --git a/tests/integration/test_parallel_replicas_snapshot_from_initiator/__init__.py b/tests/integration/test_parallel_replicas_snapshot_from_initiator/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_parallel_replicas_snapshot_from_initiator/configs/remote_servers.xml b/tests/integration/test_parallel_replicas_snapshot_from_initiator/configs/remote_servers.xml new file mode 100644 index 00000000000..734acf5f363 --- /dev/null +++ b/tests/integration/test_parallel_replicas_snapshot_from_initiator/configs/remote_servers.xml @@ -0,0 +1,33 @@ + + + + + false + + node0 + 9000 + + + node1 + 9000 + + + node2 + 9000 + + + node3 + 9000 + + + node4 + 9000 + + + node5 + 9000 + + + + + diff --git a/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py b/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py new file mode 100644 index 00000000000..a7e7e99455b --- /dev/null +++ b/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py @@ -0,0 +1,73 @@ +import pytest + +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) + +nodes = [ + cluster.add_instance( + f"node{num}", main_configs=["configs/remote_servers.xml"], with_zookeeper=True + ) + for num in range(6) +] + + +@pytest.fixture(scope="module", autouse=True) +def start_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def _create_tables(table_name): + for idx, node in enumerate(nodes): + node.query( + f"DROP TABLE IF EXISTS {table_name}", + settings={"database_atomic_wait_for_drop_and_detach_synchronously": True}, + ) + + node.query( + f""" + CREATE TABLE {table_name} (value Int64) + Engine=ReplicatedMergeTree('/test_parallel_replicas/shard/{table_name}', '{idx}') + ORDER BY () + """ + ) + + nodes[0].query( + f"INSERT INTO {table_name} SELECT * FROM numbers(1000)", + settings={"insert_deduplicate": 0}, + ) + nodes[0].query(f"SYSTEM SYNC REPLICA ON CLUSTER 'parallel_replicas' {table_name}") + + for idx, node in enumerate(nodes): + node.query("SYSTEM STOP REPLICATED SENDS") + # the same data on all nodes except for a single value + node.query( + f"INSERT INTO {table_name} VALUES ({idx})", + settings={"insert_deduplicate": 0}, + ) + + +# check that we use the state of data parts from the initiator node (for some sort of determinism of what is been read). +# currently it is implemented only when we build local plan for the initiator node (we aim to make this behavior default) +def test_initiator_snapshot_is_used_for_reading(start_cluster): + table_name = "t" + _create_tables(table_name) + + for idx, node in enumerate(nodes): + expected = 499500 + idx # sum of all integers 0..999 + idx + assert ( + node.query( + f"SELECT sum(value) FROM {table_name}", + settings={ + "allow_experimental_parallel_reading_from_replicas": 2, + "max_parallel_replicas": 100, + "cluster_for_parallel_replicas": "parallel_replicas", + "parallel_replicas_local_plan": True, + }, + ) + == f"{expected}\n" + ) diff --git a/tests/integration/test_replicated_database/test.py b/tests/integration/test_replicated_database/test.py index 60a6e099b22..533eb601ad6 100644 --- a/tests/integration/test_replicated_database/test.py +++ b/tests/integration/test_replicated_database/test.py @@ -1549,3 +1549,19 @@ def test_all_groups_cluster(started_cluster): assert "bad_settings_node\ndummy_node\n" == bad_settings_node.query( "select host_name from system.clusters where name='all_groups.db_cluster' order by host_name" ) + + +def test_detach_attach_table(started_cluster): + main_node.query("DROP DATABASE IF EXISTS detach_attach_db SYNC") + main_node.query( + "CREATE DATABASE detach_attach_db ENGINE = Replicated('/clickhouse/databases/detach_attach_db');" + ) + main_node.query( + "CREATE TABLE detach_attach_db.detach_attach_table (k UInt64) ENGINE=ReplicatedMergeTree ORDER BY k;" + ) + main_node.query("INSERT INTO detach_attach_db.detach_attach_table VALUES (1);") + main_node.query("DETACH TABLE detach_attach_db.detach_attach_table PERMANENTLY;") + main_node.query("ATTACH TABLE detach_attach_db.detach_attach_table;") + assert ( + main_node.query("SELECT * FROM detach_attach_db.detach_attach_table;") == "1\n" + ) diff --git a/tests/queries/0_stateless/02494_query_cache_bugs.reference b/tests/queries/0_stateless/02494_query_cache_bugs.reference index ea9017d5394..d50e9c42204 100644 --- a/tests/queries/0_stateless/02494_query_cache_bugs.reference +++ b/tests/queries/0_stateless/02494_query_cache_bugs.reference @@ -23,23 +23,3 @@ Row 1: x: 1 2 -- Bug 67476: Queries with overflow mode != throw must not be cached by the query cache -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 diff --git a/tests/queries/0_stateless/02494_query_cache_bugs.sql b/tests/queries/0_stateless/02494_query_cache_bugs.sql index 423068aa646..755a5fae924 100644 --- a/tests/queries/0_stateless/02494_query_cache_bugs.sql +++ b/tests/queries/0_stateless/02494_query_cache_bugs.sql @@ -43,25 +43,15 @@ DROP TABLE IF EXISTS tab; CREATE TABLE tab(c UInt64) ENGINE = Memory; SYSTEM DROP QUERY CACHE; -SELECT sum(c) FROM tab SETTINGS read_overflow_mode = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; -SELECT sum(c) FROM tab SETTINGS read_overflow_mode_leaf = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; -SELECT sum(c) FROM tab SETTINGS group_by_overflow_mode = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; -SELECT sum(c) FROM tab SETTINGS sort_overflow_mode = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; -SELECT sum(c) FROM tab SETTINGS result_overflow_mode = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; -SELECT sum(c) FROM tab SETTINGS timeout_overflow_mode = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; -SELECT sum(c) FROM tab SETTINGS set_overflow_mode = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; -SELECT sum(c) FROM tab SETTINGS join_overflow_mode = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; -SELECT sum(c) FROM tab SETTINGS transfer_overflow_mode = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; -SELECT sum(c) FROM tab SETTINGS distinct_overflow_mode = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; +SELECT sum(c) FROM tab SETTINGS read_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } +SELECT sum(c) FROM tab SETTINGS read_overflow_mode_leaf = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } +SELECT sum(c) FROM tab SETTINGS group_by_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } +SELECT sum(c) FROM tab SETTINGS sort_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } +SELECT sum(c) FROM tab SETTINGS result_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } +SELECT sum(c) FROM tab SETTINGS timeout_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } +SELECT sum(c) FROM tab SETTINGS set_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } +SELECT sum(c) FROM tab SETTINGS join_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } +SELECT sum(c) FROM tab SETTINGS transfer_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } +SELECT sum(c) FROM tab SETTINGS distinct_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } SYSTEM DROP QUERY CACHE;