From 59eb3aa9a9662b6a2930453f3909a271c4a8198f Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Fri, 20 Aug 2021 15:59:57 +0300 Subject: [PATCH 01/23] avoid too long waiting for inactive replicas --- src/Core/Settings.h | 3 +- .../PartMovesBetweenShardsOrchestrator.cpp | 10 +- src/Storages/StorageReplicatedMergeTree.cpp | 137 ++++++++++++------ src/Storages/StorageReplicatedMergeTree.h | 10 +- tests/config/users.d/timeouts.xml | 4 + 5 files changed, 109 insertions(+), 55 deletions(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 30f3c69ff4a..f154b1ccbd5 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -97,7 +97,7 @@ class IColumn; M(Bool, optimize_move_to_prewhere_if_final, false, "If query has `FINAL`, the optimization `move_to_prewhere` is not always correct and it is enabled only if both settings `optimize_move_to_prewhere` and `optimize_move_to_prewhere_if_final` are turned on", 0) \ \ M(UInt64, replication_alter_partitions_sync, 1, "Wait for actions to manipulate the partitions. 0 - do not wait, 1 - wait for execution only of itself, 2 - wait for everyone.", 0) \ - M(UInt64, replication_alter_columns_timeout, 60, "Wait for actions to change the table structure within the specified number of seconds. 0 - wait unlimited time.", 0) \ + M(Int64, replication_wait_for_inactive_replica_timeout, 120, "Wait for inactive replica to execute ALTER/OPTIMIZE. Time in seconds, 0 - wait unlimited time, negative - do not wait.", 0) \ \ M(LoadBalancing, load_balancing, LoadBalancing::RANDOM, "Which replicas (among healthy replicas) to preferably send a query to (on the first attempt) for distributed processing.", 0) \ M(UInt64, load_balancing_first_offset, 0, "Which replica to preferably send a query when FIRST_OR_RANDOM load balancing strategy is used.", 0) \ @@ -508,6 +508,7 @@ class IColumn; M(Bool, allow_experimental_window_functions, true, "Obsolete setting, does nothing.", 0) \ M(HandleKafkaErrorMode, handle_kafka_error_mode, HandleKafkaErrorMode::DEFAULT, "Obsolete setting, does nothing.", 0) \ M(Bool, database_replicated_ddl_output, true, "Obsolete setting, does nothing.", 0) \ + M(UInt64, replication_alter_columns_timeout, 60, "Obsolete setting, does nothing.", 0) \ /** The section above is for obsolete settings. Do not add anything there. */ diff --git a/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp b/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp index 4c187109ac6..7297266bd30 100644 --- a/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp +++ b/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp @@ -194,7 +194,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: /// This wait in background schedule pool is useless. It'd be /// better to have some notification which will call `step` /// function when all replicated will finish. TODO. - storage.waitForAllReplicasToProcessLogEntry(log_entry, true); + storage.waitForAllReplicasToProcessLogEntry(log_entry, 0); } { @@ -231,7 +231,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: String log_znode_path = dynamic_cast(*responses.back()).path_created; log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - storage.waitForAllTableReplicasToProcessLogEntry(entry.to_shard, log_entry, true); + storage.waitForAllTableReplicasToProcessLogEntry(entry.to_shard, log_entry, 0); } { @@ -269,7 +269,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: String log_znode_path = dynamic_cast(*responses.back()).path_created; log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - storage.waitForAllTableReplicasToProcessLogEntry(entry.to_shard, log_entry, true); + storage.waitForAllTableReplicasToProcessLogEntry(entry.to_shard, log_entry, 0); } { @@ -318,7 +318,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: String log_znode_path = dynamic_cast(*responses.back()).path_created; log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - storage.waitForAllTableReplicasToProcessLogEntry(entry.to_shard, log_entry, true); + storage.waitForAllTableReplicasToProcessLogEntry(entry.to_shard, log_entry, 0); } { @@ -348,7 +348,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: { ReplicatedMergeTreeLogEntry log_entry; if (storage.dropPartImpl(zk, entry.part_name, log_entry, false, false)) - storage.waitForAllReplicasToProcessLogEntry(log_entry, true); + storage.waitForAllReplicasToProcessLogEntry(log_entry, 0); } { diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 98ce2ac73e1..165a7998cfa 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -4719,11 +4719,15 @@ bool StorageReplicatedMergeTree::optimize( } } - if (query_context->getSettingsRef().replication_alter_partitions_sync != 0) + table_lock.reset(); + + Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; + for (auto & merge_entry : merge_entries) { - /// NOTE Table lock must not be held while waiting. Some combination of R-W-R locks from different threads will yield to deadlock. - for (auto & merge_entry : merge_entries) - waitForAllReplicasToProcessLogEntry(merge_entry, false); + if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) + waitForReplicaToProcessLogEntry(replica_name, merge_entry, wait_for_inactive_timeout); + else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) + waitForAllReplicasToProcessLogEntry(merge_entry, wait_for_inactive_timeout); } return true; @@ -5036,15 +5040,16 @@ void StorageReplicatedMergeTree::alter( table_lock_holder.reset(); std::vector unwaited; + Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) { LOG_DEBUG(log, "Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes."); - unwaited = waitForAllReplicasToProcessLogEntry(*alter_entry, false); + unwaited = tryWaitForAllReplicasToProcessLogEntry(*alter_entry, wait_for_inactive_timeout); } else if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) { LOG_DEBUG(log, "Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes."); - waitForReplicaToProcessLogEntry(replica_name, *alter_entry); + waitForReplicaToProcessLogEntry(replica_name, *alter_entry, wait_for_inactive_timeout); } if (!unwaited.empty()) @@ -5200,10 +5205,11 @@ void StorageReplicatedMergeTree::dropPart(const String & part_name, bool detach, dropPartImpl(zookeeper, part_name, entry, detach, /*throw_if_noop=*/ true); /// If necessary, wait until the operation is performed on itself or on all replicas. + Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) - waitForReplicaToProcessLogEntry(replica_name, entry); + waitForReplicaToProcessLogEntry(replica_name, entry, wait_for_inactive_timeout); else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) - waitForAllReplicasToProcessLogEntry(entry); + waitForAllReplicasToProcessLogEntry(entry, wait_for_inactive_timeout); } void StorageReplicatedMergeTree::dropPartition(const ASTPtr & partition, bool detach, ContextPtr query_context) @@ -5221,10 +5227,11 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr & partition, bool de if (did_drop) { /// If necessary, wait until the operation is performed on itself or on all replicas. + Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) - waitForReplicaToProcessLogEntry(replica_name, entry); + waitForReplicaToProcessLogEntry(replica_name, entry, wait_for_inactive_timeout); else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) - waitForAllReplicasToProcessLogEntry(entry); + waitForAllReplicasToProcessLogEntry(entry, wait_for_inactive_timeout); cleanLastPartNode(partition_id); } @@ -5244,12 +5251,22 @@ void StorageReplicatedMergeTree::truncate( Strings partitions = zookeeper->getChildren(fs::path(zookeeper_path) / "block_numbers"); + std::vector> entries_to_wait; + entries_to_wait.reserve(partitions.size()); for (String & partition_id : partitions) { - LogEntry entry; + auto entry = std::make_unique(); + if (dropAllPartsInPartition(*zookeeper, partition_id, *entry, query_context, false)) + entries_to_wait.push_back(std::move(entry)); + } - if (dropAllPartsInPartition(*zookeeper, partition_id, entry, query_context, false)) - waitForAllReplicasToProcessLogEntry(entry); + Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; + for (const auto & entry : entries_to_wait) + { + if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) + waitForReplicaToProcessLogEntry(replica_name, *entry, wait_for_inactive_timeout); + else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) + waitForAllReplicasToProcessLogEntry(*entry, wait_for_inactive_timeout); } } @@ -5408,19 +5425,20 @@ StorageReplicatedMergeTree::allocateBlockNumber( } -Strings StorageReplicatedMergeTree::waitForAllTableReplicasToProcessLogEntry( - const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active) +Strings StorageReplicatedMergeTree::tryWaitForAllTableReplicasToProcessLogEntry( + const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout) { LOG_DEBUG(log, "Waiting for all replicas to process {}", entry.znode_name); auto zookeeper = getZooKeeper(); Strings replicas = zookeeper->getChildren(fs::path(table_zookeeper_path) / "replicas"); Strings unwaited; + bool wait_for_inactive = 0 <= wait_for_inactive_timeout; for (const String & replica : replicas) { - if (wait_for_non_active || zookeeper->exists(fs::path(table_zookeeper_path) / "replicas" / replica / "is_active")) + if (wait_for_inactive || zookeeper->exists(fs::path(table_zookeeper_path) / "replicas" / replica / "is_active")) { - if (!waitForTableReplicaToProcessLogEntry(table_zookeeper_path, replica, entry, wait_for_non_active)) + if (!waitForTableReplicaToProcessLogEntry(table_zookeeper_path, replica, entry, wait_for_inactive_timeout)) unwaited.push_back(replica); } else @@ -5433,16 +5451,31 @@ Strings StorageReplicatedMergeTree::waitForAllTableReplicasToProcessLogEntry( return unwaited; } - -Strings StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry( - const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active) +void StorageReplicatedMergeTree::waitForAllTableReplicasToProcessLogEntry( + const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout) { - return waitForAllTableReplicasToProcessLogEntry(zookeeper_path, entry, wait_for_non_active); + Strings unfinished_replicas = tryWaitForAllTableReplicasToProcessLogEntry(table_zookeeper_path, entry, wait_for_inactive_timeout); + if (unfinished_replicas.empty()) + return; + + throw Exception(ErrorCodes::UNFINISHED, "Timeout exceeded while waiting for replicas {} to process entry {}. " + "Probably some replicas are inactive", fmt::join(unfinished_replicas, ", "), entry.znode_name); } +Strings StorageReplicatedMergeTree::tryWaitForAllReplicasToProcessLogEntry( + const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout) +{ + return tryWaitForAllTableReplicasToProcessLogEntry(zookeeper_path, entry, wait_for_inactive_timeout); +} + +void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry( + const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout) +{ + waitForAllTableReplicasToProcessLogEntry(zookeeper_path, entry, wait_for_inactive_timeout); +} bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry( - const String & table_zookeeper_path, const String & replica, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active) + const String & table_zookeeper_path, const String & replica, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout) { String entry_str = entry.toString(); String log_node_name; @@ -5460,18 +5493,24 @@ bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry( */ bool waiting_itself = replica == replica_name; + bool wait_for_inactive = 0 <= wait_for_inactive_timeout; + Stopwatch time_waiting; const auto & stop_waiting = [&]() { bool stop_waiting_itself = waiting_itself && partial_shutdown_called; - bool stop_waiting_non_active = !wait_for_non_active && !getZooKeeper()->exists(fs::path(table_zookeeper_path) / "replicas" / replica / "is_active"); - return is_dropped || stop_waiting_itself || stop_waiting_non_active; + bool timeout_exceeded = 0 < wait_for_inactive_timeout && wait_for_inactive_timeout < time_waiting.elapsedSeconds(); + bool stop_waiting_inactive = (!wait_for_inactive || timeout_exceeded) + && !getZooKeeper()->exists(fs::path(table_zookeeper_path) / "replicas" / replica / "is_active"); + return is_dropped || stop_waiting_itself || stop_waiting_inactive; }; /// Don't recheck ZooKeeper too often constexpr auto event_wait_timeout_ms = 3000; - if (startsWith(entry.znode_name, "log-")) + if (!startsWith(entry.znode_name, "log-")) + throw Exception("Logical error: unexpected name of log node: " + entry.znode_name, ErrorCodes::LOGICAL_ERROR); + { /// Take the number from the node name `log-xxxxxxxxxx`. UInt64 log_index = parse(entry.znode_name.substr(entry.znode_name.size() - 10)); @@ -5480,13 +5519,17 @@ bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry( LOG_DEBUG(log, "Waiting for {} to pull {} to queue", replica, log_node_name); /// Let's wait until entry gets into the replica queue. + bool pulled_to_queue = false; while (!stop_waiting()) { zkutil::EventPtr event = std::make_shared(); String log_pointer = getZooKeeper()->get(fs::path(table_zookeeper_path) / "replicas" / replica / "log_pointer", nullptr, event); if (!log_pointer.empty() && parse(log_pointer) > log_index) + { + pulled_to_queue = true; break; + } /// Wait with timeout because we can be already shut down, but not dropped. /// So log_pointer node will exist, but we will never update it because all background threads already stopped. @@ -5494,9 +5537,10 @@ bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry( /// but the query will never finish because the drop already shut down the table. event->tryWait(event_wait_timeout_ms); } + + if (!pulled_to_queue) + return false; } - else - throw Exception("Logical error: unexpected name of log node: " + entry.znode_name, ErrorCodes::LOGICAL_ERROR); LOG_DEBUG(log, "Looking for node corresponding to {} in {} queue", log_node_name, replica); @@ -5535,9 +5579,9 @@ bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry( bool StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry( - const String & replica, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active) + const String & replica, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout) { - return waitForTableReplicaToProcessLogEntry(zookeeper_path, replica, entry, wait_for_non_active); + return waitForTableReplicaToProcessLogEntry(zookeeper_path, replica, entry, wait_for_inactive_timeout); } @@ -6548,13 +6592,15 @@ void StorageReplicatedMergeTree::replacePartitionFrom( parts_to_remove.clear(); cleanup_thread.wakeup(); + lock2.reset(); + lock1.reset(); + /// If necessary, wait until the operation is performed on all replicas. - if (query_context->getSettingsRef().replication_alter_partitions_sync > 1) - { - lock2.reset(); - lock1.reset(); - waitForAllReplicasToProcessLogEntry(entry); - } + Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; + if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) + waitForReplicaToProcessLogEntry(replica_name, entry, wait_for_inactive_timeout); + else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) + waitForAllReplicasToProcessLogEntry(entry, wait_for_inactive_timeout); } void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr query_context) @@ -6732,12 +6778,13 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta parts_to_remove.clear(); cleanup_thread.wakeup(); + lock2.reset(); - if (query_context->getSettingsRef().replication_alter_partitions_sync > 1) - { - lock2.reset(); - dest_table_storage->waitForAllReplicasToProcessLogEntry(entry); - } + Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; + if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) + waitForReplicaToProcessLogEntry(replica_name, entry, wait_for_inactive_timeout); + else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) + waitForAllReplicasToProcessLogEntry(entry, wait_for_inactive_timeout); /// Create DROP_RANGE for the source table alter_partition_version_path = zookeeper_path + "/alter_partition_version"; @@ -6764,11 +6811,11 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta log_znode_path = dynamic_cast(*op_results.front()).path_created; entry_delete.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - if (query_context->getSettingsRef().replication_alter_partitions_sync > 1) - { - lock1.reset(); - waitForAllReplicasToProcessLogEntry(entry_delete); - } + lock1.reset(); + if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) + waitForReplicaToProcessLogEntry(replica_name, entry_delete, wait_for_inactive_timeout); + else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) + waitForAllReplicasToProcessLogEntry(entry_delete, wait_for_inactive_timeout); /// Cleaning possibly stored information about parts from /quorum/last_part node in ZooKeeper. cleanLastPartNode(partition_id); diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 3d2727d7bb9..7a733c42427 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -638,14 +638,16 @@ private: * * One method for convenient use on current table, another for waiting on foreign shards. */ - Strings waitForAllTableReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true); - Strings waitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true); + void waitForAllTableReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout); + void waitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout); + Strings tryWaitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout); + Strings tryWaitForAllTableReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout); /** Wait until the specified replica executes the specified action from the log. * NOTE: See comment about locks above. */ - bool waitForTableReplicaToProcessLogEntry(const String & table_zookeeper_path, const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true); - bool waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true); + bool waitForTableReplicaToProcessLogEntry(const String & table_zookeeper_path, const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout = 0); + bool waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout = 0); /// Throw an exception if the table is readonly. void assertNotReadonly() const; diff --git a/tests/config/users.d/timeouts.xml b/tests/config/users.d/timeouts.xml index 60b24cfdef8..6a1099c4404 100644 --- a/tests/config/users.d/timeouts.xml +++ b/tests/config/users.d/timeouts.xml @@ -6,6 +6,10 @@ 60 60000 + + 10 + + 30 From 9ef0b00803e67bf18051f118abd80d5fd66e082f Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Fri, 20 Aug 2021 17:12:42 +0300 Subject: [PATCH 02/23] fix config --- tests/config/config.d/merge_tree_settings.xml | 6 ++++++ tests/config/install.sh | 1 + tests/config/users.d/timeouts.xml | 2 -- 3 files changed, 7 insertions(+), 2 deletions(-) create mode 100644 tests/config/config.d/merge_tree_settings.xml diff --git a/tests/config/config.d/merge_tree_settings.xml b/tests/config/config.d/merge_tree_settings.xml new file mode 100644 index 00000000000..8c57dc3acfd --- /dev/null +++ b/tests/config/config.d/merge_tree_settings.xml @@ -0,0 +1,6 @@ + + + + 10 + + diff --git a/tests/config/install.sh b/tests/config/install.sh index 571dff34018..ea8c3ec56c8 100755 --- a/tests/config/install.sh +++ b/tests/config/install.sh @@ -28,6 +28,7 @@ ln -sf $SRC_PATH/config.d/clusters.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/graphite.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/database_atomic.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/max_concurrent_queries.xml $DEST_SERVER_PATH/config.d/ +ln -sf $SRC_PATH/config.d/merge_tree_settings.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/test_cluster_with_incorrect_pw.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/keeper_port.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/logging_no_rotate.xml $DEST_SERVER_PATH/config.d/ diff --git a/tests/config/users.d/timeouts.xml b/tests/config/users.d/timeouts.xml index 6a1099c4404..7d58315bd94 100644 --- a/tests/config/users.d/timeouts.xml +++ b/tests/config/users.d/timeouts.xml @@ -6,8 +6,6 @@ 60 60000 - - 10 30 From cc9c2fd63b05be686b66c365d7a713c7909a1b05 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Mon, 23 Aug 2021 15:57:50 +0300 Subject: [PATCH 03/23] make code better --- src/Core/Settings.h | 2 +- .../PartMovesBetweenShardsOrchestrator.cpp | 10 +- src/Storages/StorageReplicatedMergeTree.cpp | 123 ++++++------------ src/Storages/StorageReplicatedMergeTree.h | 23 ++-- ...ical_result_after_merge_zookeeper_long.sql | 1 + ..._execute_merges_on_single_replica_long.sql | 1 + 6 files changed, 60 insertions(+), 100 deletions(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index f154b1ccbd5..e2e662a64d6 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -97,7 +97,7 @@ class IColumn; M(Bool, optimize_move_to_prewhere_if_final, false, "If query has `FINAL`, the optimization `move_to_prewhere` is not always correct and it is enabled only if both settings `optimize_move_to_prewhere` and `optimize_move_to_prewhere_if_final` are turned on", 0) \ \ M(UInt64, replication_alter_partitions_sync, 1, "Wait for actions to manipulate the partitions. 0 - do not wait, 1 - wait for execution only of itself, 2 - wait for everyone.", 0) \ - M(Int64, replication_wait_for_inactive_replica_timeout, 120, "Wait for inactive replica to execute ALTER/OPTIMIZE. Time in seconds, 0 - wait unlimited time, negative - do not wait.", 0) \ + M(Int64, replication_wait_for_inactive_replica_timeout, 120, "Wait for inactive replica to execute ALTER/OPTIMIZE. Time in seconds, 0 - do not wait, negative - wait for unlimited time.", 0) \ \ M(LoadBalancing, load_balancing, LoadBalancing::RANDOM, "Which replicas (among healthy replicas) to preferably send a query to (on the first attempt) for distributed processing.", 0) \ M(UInt64, load_balancing_first_offset, 0, "Which replica to preferably send a query when FIRST_OR_RANDOM load balancing strategy is used.", 0) \ diff --git a/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp b/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp index 7297266bd30..ca51353b1bc 100644 --- a/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp +++ b/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp @@ -194,7 +194,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: /// This wait in background schedule pool is useless. It'd be /// better to have some notification which will call `step` /// function when all replicated will finish. TODO. - storage.waitForAllReplicasToProcessLogEntry(log_entry, 0); + storage.waitForAllReplicasToProcessLogEntry(zookeeper_path, log_entry, 0); } { @@ -231,7 +231,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: String log_znode_path = dynamic_cast(*responses.back()).path_created; log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - storage.waitForAllTableReplicasToProcessLogEntry(entry.to_shard, log_entry, 0); + storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, 0); } { @@ -269,7 +269,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: String log_znode_path = dynamic_cast(*responses.back()).path_created; log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - storage.waitForAllTableReplicasToProcessLogEntry(entry.to_shard, log_entry, 0); + storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, 0); } { @@ -318,7 +318,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: String log_znode_path = dynamic_cast(*responses.back()).path_created; log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - storage.waitForAllTableReplicasToProcessLogEntry(entry.to_shard, log_entry, 0); + storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, 0); } { @@ -348,7 +348,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: { ReplicatedMergeTreeLogEntry log_entry; if (storage.dropPartImpl(zk, entry.part_name, log_entry, false, false)) - storage.waitForAllReplicasToProcessLogEntry(log_entry, 0); + storage.waitForAllReplicasToProcessLogEntry(zookeeper_path, log_entry, 0); } { diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 165a7998cfa..a88e38873a5 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -4721,14 +4721,8 @@ bool StorageReplicatedMergeTree::optimize( table_lock.reset(); - Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; for (auto & merge_entry : merge_entries) - { - if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) - waitForReplicaToProcessLogEntry(replica_name, merge_entry, wait_for_inactive_timeout); - else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) - waitForAllReplicasToProcessLogEntry(merge_entry, wait_for_inactive_timeout); - } + waitForLogEntryToBeProcessedIfNecessary(merge_entry, query_context); return true; } @@ -5039,21 +5033,8 @@ void StorageReplicatedMergeTree::alter( table_lock_holder.reset(); - std::vector unwaited; - Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; - if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) - { - LOG_DEBUG(log, "Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes."); - unwaited = tryWaitForAllReplicasToProcessLogEntry(*alter_entry, wait_for_inactive_timeout); - } - else if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) - { - LOG_DEBUG(log, "Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes."); - waitForReplicaToProcessLogEntry(replica_name, *alter_entry, wait_for_inactive_timeout); - } - - if (!unwaited.empty()) - throw Exception("Some replicas doesn't finish metadata alter", ErrorCodes::UNFINISHED); + LOG_DEBUG(log, "Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes."); + waitForLogEntryToBeProcessedIfNecessary(*alter_entry, query_context, "Some replicas doesn't finish metadata alter: "); if (mutation_znode) { @@ -5204,12 +5185,7 @@ void StorageReplicatedMergeTree::dropPart(const String & part_name, bool detach, dropPartImpl(zookeeper, part_name, entry, detach, /*throw_if_noop=*/ true); - /// If necessary, wait until the operation is performed on itself or on all replicas. - Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; - if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) - waitForReplicaToProcessLogEntry(replica_name, entry, wait_for_inactive_timeout); - else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) - waitForAllReplicasToProcessLogEntry(entry, wait_for_inactive_timeout); + waitForLogEntryToBeProcessedIfNecessary(entry, query_context); } void StorageReplicatedMergeTree::dropPartition(const ASTPtr & partition, bool detach, ContextPtr query_context) @@ -5226,13 +5202,7 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr & partition, bool de if (did_drop) { - /// If necessary, wait until the operation is performed on itself or on all replicas. - Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; - if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) - waitForReplicaToProcessLogEntry(replica_name, entry, wait_for_inactive_timeout); - else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) - waitForAllReplicasToProcessLogEntry(entry, wait_for_inactive_timeout); - + waitForLogEntryToBeProcessedIfNecessary(entry, query_context); cleanLastPartNode(partition_id); } } @@ -5260,14 +5230,8 @@ void StorageReplicatedMergeTree::truncate( entries_to_wait.push_back(std::move(entry)); } - Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; for (const auto & entry : entries_to_wait) - { - if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) - waitForReplicaToProcessLogEntry(replica_name, *entry, wait_for_inactive_timeout); - else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) - waitForAllReplicasToProcessLogEntry(*entry, wait_for_inactive_timeout); - } + waitForLogEntryToBeProcessedIfNecessary(*entry, query_context); } @@ -5425,7 +5389,7 @@ StorageReplicatedMergeTree::allocateBlockNumber( } -Strings StorageReplicatedMergeTree::tryWaitForAllTableReplicasToProcessLogEntry( +Strings StorageReplicatedMergeTree::tryWaitForAllReplicasToProcessLogEntry( const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout) { LOG_DEBUG(log, "Waiting for all replicas to process {}", entry.znode_name); @@ -5433,12 +5397,12 @@ Strings StorageReplicatedMergeTree::tryWaitForAllTableReplicasToProcessLogEntry( auto zookeeper = getZooKeeper(); Strings replicas = zookeeper->getChildren(fs::path(table_zookeeper_path) / "replicas"); Strings unwaited; - bool wait_for_inactive = 0 <= wait_for_inactive_timeout; + bool wait_for_inactive = wait_for_inactive_timeout != 0; for (const String & replica : replicas) { if (wait_for_inactive || zookeeper->exists(fs::path(table_zookeeper_path) / "replicas" / replica / "is_active")) { - if (!waitForTableReplicaToProcessLogEntry(table_zookeeper_path, replica, entry, wait_for_inactive_timeout)) + if (!tryWaitForReplicaToProcessLogEntry(table_zookeeper_path, replica, entry, wait_for_inactive_timeout)) unwaited.push_back(replica); } else @@ -5451,30 +5415,37 @@ Strings StorageReplicatedMergeTree::tryWaitForAllTableReplicasToProcessLogEntry( return unwaited; } -void StorageReplicatedMergeTree::waitForAllTableReplicasToProcessLogEntry( - const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout) +void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry( + const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout, const String & error_context) { - Strings unfinished_replicas = tryWaitForAllTableReplicasToProcessLogEntry(table_zookeeper_path, entry, wait_for_inactive_timeout); + Strings unfinished_replicas = tryWaitForAllReplicasToProcessLogEntry(table_zookeeper_path, entry, wait_for_inactive_timeout); if (unfinished_replicas.empty()) return; - throw Exception(ErrorCodes::UNFINISHED, "Timeout exceeded while waiting for replicas {} to process entry {}. " - "Probably some replicas are inactive", fmt::join(unfinished_replicas, ", "), entry.znode_name); + throw Exception(ErrorCodes::UNFINISHED, "{}Timeout exceeded while waiting for replicas {} to process entry {}. " + "Probably some replicas are inactive", error_context, fmt::join(unfinished_replicas, ", "), entry.znode_name); } -Strings StorageReplicatedMergeTree::tryWaitForAllReplicasToProcessLogEntry( - const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout) +void StorageReplicatedMergeTree::waitForLogEntryToBeProcessedIfNecessary(const ReplicatedMergeTreeLogEntryData & entry, ContextPtr query_context, const String & error_context) { - return tryWaitForAllTableReplicasToProcessLogEntry(zookeeper_path, entry, wait_for_inactive_timeout); + /// If necessary, wait until the operation is performed on itself or on all replicas. + Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; + if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) + { + bool finished = tryWaitForReplicaToProcessLogEntry(zookeeper_path, replica_name, entry, wait_for_inactive_timeout); + if (!finished) + { + throw Exception(ErrorCodes::UNFINISHED, "{}Log entry {} is not precessed on local replica, " + "most likely because the replica was shut down.", error_context, entry.znode_name); + } + } + else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) + { + waitForAllReplicasToProcessLogEntry(zookeeper_path, entry, wait_for_inactive_timeout, error_context); + } } -void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry( - const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout) -{ - waitForAllTableReplicasToProcessLogEntry(zookeeper_path, entry, wait_for_inactive_timeout); -} - -bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry( +bool StorageReplicatedMergeTree::tryWaitForReplicaToProcessLogEntry( const String & table_zookeeper_path, const String & replica, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout) { String entry_str = entry.toString(); @@ -5493,13 +5464,16 @@ bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry( */ bool waiting_itself = replica == replica_name; - bool wait_for_inactive = 0 <= wait_for_inactive_timeout; + /// Do not wait if timeout is zero + bool wait_for_inactive = wait_for_inactive_timeout != 0; + /// Wait for unlimited time if timeout is negative + bool check_timeout = wait_for_inactive_timeout > 0; Stopwatch time_waiting; const auto & stop_waiting = [&]() { bool stop_waiting_itself = waiting_itself && partial_shutdown_called; - bool timeout_exceeded = 0 < wait_for_inactive_timeout && wait_for_inactive_timeout < time_waiting.elapsedSeconds(); + bool timeout_exceeded = check_timeout && wait_for_inactive_timeout < time_waiting.elapsedSeconds(); bool stop_waiting_inactive = (!wait_for_inactive || timeout_exceeded) && !getZooKeeper()->exists(fs::path(table_zookeeper_path) / "replicas" / replica / "is_active"); return is_dropped || stop_waiting_itself || stop_waiting_inactive; @@ -5578,13 +5552,6 @@ bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry( } -bool StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry( - const String & replica, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout) -{ - return waitForTableReplicaToProcessLogEntry(zookeeper_path, replica, entry, wait_for_inactive_timeout); -} - - void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields) { auto zookeeper = tryGetZooKeeper(); @@ -6595,12 +6562,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom( lock2.reset(); lock1.reset(); - /// If necessary, wait until the operation is performed on all replicas. - Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; - if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) - waitForReplicaToProcessLogEntry(replica_name, entry, wait_for_inactive_timeout); - else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) - waitForAllReplicasToProcessLogEntry(entry, wait_for_inactive_timeout); + waitForLogEntryToBeProcessedIfNecessary(entry, query_context); } void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr query_context) @@ -6780,11 +6742,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta cleanup_thread.wakeup(); lock2.reset(); - Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; - if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) - waitForReplicaToProcessLogEntry(replica_name, entry, wait_for_inactive_timeout); - else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) - waitForAllReplicasToProcessLogEntry(entry, wait_for_inactive_timeout); + dest_table_storage->waitForLogEntryToBeProcessedIfNecessary(entry, query_context); /// Create DROP_RANGE for the source table alter_partition_version_path = zookeeper_path + "/alter_partition_version"; @@ -6812,10 +6770,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta entry_delete.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); lock1.reset(); - if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) - waitForReplicaToProcessLogEntry(replica_name, entry_delete, wait_for_inactive_timeout); - else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) - waitForAllReplicasToProcessLogEntry(entry_delete, wait_for_inactive_timeout); + waitForLogEntryToBeProcessedIfNecessary(entry_delete, query_context); /// Cleaning possibly stored information about parts from /quorum/last_part node in ZooKeeper. cleanLastPartNode(partition_id); diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 7a733c42427..91f055d6aec 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -630,24 +630,27 @@ private: const String & zookeeper_block_id_path = "", const String & zookeeper_path_prefix = "") const; /** Wait until all replicas, including this, execute the specified action from the log. - * If replicas are added at the same time, it can not wait the added replica . + * If replicas are added at the same time, it can not wait the added replica. + * + * Waits for inactive replicas no more than wait_for_inactive_timeout. + * Returns list of inactive replicas that have not executed entry or throws exception. * * NOTE: This method must be called without table lock held. * Because it effectively waits for other thread that usually has to also acquire a lock to proceed and this yields deadlock. - * TODO: There are wrong usages of this method that are not fixed yet. - * - * One method for convenient use on current table, another for waiting on foreign shards. */ - void waitForAllTableReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout); - void waitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout); - Strings tryWaitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout); - Strings tryWaitForAllTableReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout); + void waitForAllReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, + Int64 wait_for_inactive_timeout, const String & error_context = {}); + Strings tryWaitForAllReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, + Int64 wait_for_inactive_timeout); /** Wait until the specified replica executes the specified action from the log. * NOTE: See comment about locks above. */ - bool waitForTableReplicaToProcessLogEntry(const String & table_zookeeper_path, const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout = 0); - bool waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout = 0); + bool tryWaitForReplicaToProcessLogEntry(const String & table_zookeeper_path, const String & replica_name, + const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout = 0); + + /// Depending on settings, do nothing or wait for this replica or all replicas process log entry. + void waitForLogEntryToBeProcessedIfNecessary(const ReplicatedMergeTreeLogEntryData & entry, ContextPtr query_context, const String & error_context = {}); /// Throw an exception if the table is readonly. void assertNotReadonly() const; diff --git a/tests/queries/0_stateless/00721_force_by_identical_result_after_merge_zookeeper_long.sql b/tests/queries/0_stateless/00721_force_by_identical_result_after_merge_zookeeper_long.sql index 50f51510d61..07fba5d39b4 100644 --- a/tests/queries/0_stateless/00721_force_by_identical_result_after_merge_zookeeper_long.sql +++ b/tests/queries/0_stateless/00721_force_by_identical_result_after_merge_zookeeper_long.sql @@ -12,6 +12,7 @@ SYSTEM SYNC REPLICA byte_identical_r2; ALTER TABLE byte_identical_r1 ADD COLUMN y UInt64 DEFAULT rand(); SYSTEM SYNC REPLICA byte_identical_r1; SYSTEM SYNC REPLICA byte_identical_r2; +SET replication_alter_partitions_sync=2; OPTIMIZE TABLE byte_identical_r1 PARTITION tuple() FINAL; SELECT x, t1.y - t2.y FROM byte_identical_r1 t1 SEMI LEFT JOIN byte_identical_r2 t2 USING x ORDER BY x; diff --git a/tests/queries/0_stateless/01532_execute_merges_on_single_replica_long.sql b/tests/queries/0_stateless/01532_execute_merges_on_single_replica_long.sql index 1acae560c93..85a2e893f37 100644 --- a/tests/queries/0_stateless/01532_execute_merges_on_single_replica_long.sql +++ b/tests/queries/0_stateless/01532_execute_merges_on_single_replica_long.sql @@ -98,6 +98,7 @@ SELECT '*** disable the feature'; ALTER TABLE execute_on_single_replica_r1 MODIFY SETTING execute_merges_on_single_replica_time_threshold=0; ALTER TABLE execute_on_single_replica_r2 MODIFY SETTING execute_merges_on_single_replica_time_threshold=0; +SET replication_alter_partitions_sync=2; /* all_0_0_6 - we disabled the feature, both replicas will merge */ OPTIMIZE TABLE execute_on_single_replica_r2 FINAL; /* all_0_0_7 - same */ From 4a4a0b482329c796d4b2d6fa979d021be85910bd Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Mon, 23 Aug 2021 16:32:07 +0300 Subject: [PATCH 04/23] fix --- .../MergeTree/PartMovesBetweenShardsOrchestrator.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp b/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp index ca51353b1bc..c227febbbc2 100644 --- a/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp +++ b/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp @@ -194,7 +194,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: /// This wait in background schedule pool is useless. It'd be /// better to have some notification which will call `step` /// function when all replicated will finish. TODO. - storage.waitForAllReplicasToProcessLogEntry(zookeeper_path, log_entry, 0); + storage.waitForAllReplicasToProcessLogEntry(zookeeper_path, log_entry, -1); } { @@ -231,7 +231,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: String log_znode_path = dynamic_cast(*responses.back()).path_created; log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, 0); + storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, -1); } { @@ -269,7 +269,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: String log_znode_path = dynamic_cast(*responses.back()).path_created; log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, 0); + storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, -1); } { @@ -318,7 +318,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: String log_znode_path = dynamic_cast(*responses.back()).path_created; log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, 0); + storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, -1); } { @@ -348,7 +348,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: { ReplicatedMergeTreeLogEntry log_entry; if (storage.dropPartImpl(zk, entry.part_name, log_entry, false, false)) - storage.waitForAllReplicasToProcessLogEntry(zookeeper_path, log_entry, 0); + storage.waitForAllReplicasToProcessLogEntry(zookeeper_path, log_entry, -1); } { From e44f9cd42d77e9e275401aa96ed1eb16433d640f Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 25 Aug 2021 21:11:52 +0300 Subject: [PATCH 05/23] ClickHouse Keeper: Fix endless logs when rotate_interval changed --- src/Coordination/Changelog.cpp | 60 ++++++++++++----- src/Coordination/Changelog.h | 12 ++++ src/Coordination/tests/gtest_for_build.cpp | 76 ++++++++++++++++++++++ 3 files changed, 133 insertions(+), 15 deletions(-) diff --git a/src/Coordination/Changelog.cpp b/src/Coordination/Changelog.cpp index 6ec9b17d0a7..0d0fdaaa682 100644 --- a/src/Coordination/Changelog.cpp +++ b/src/Coordination/Changelog.cpp @@ -270,25 +270,36 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin { uint64_t total_read = 0; uint64_t entries_in_last = 0; - uint64_t incomplete_log_index = 0; + int64_t incomplete_log_index = -1; ChangelogReadResult result{}; + + /// First read log_id uint64_t first_read_index = 0; + /// We must start to read from this log index uint64_t start_to_read_from = last_commited_log_index; + + /// If we need to have some reserved log read additional `logs_to_keep` logs if (start_to_read_from > logs_to_keep) start_to_read_from -= logs_to_keep; else start_to_read_from = 1; bool started = false; + /// Got through changelog files in order op start_index for (const auto & [changelog_start_index, changelog_description] : existing_changelogs) { - entries_in_last = changelog_description.to_log_index - changelog_description.from_log_index + 1; + /// How many entries we have in the last changelog + entries_in_last = changelog_description.expectedEntriesCountInLog(); + /// [from_log_index.>=.......start_to_read_from.....<=.to_log_index] if (changelog_description.to_log_index >= start_to_read_from) { - if (!started) + if (!started) /// still nothing was read { + /// Our first log starts from the more fresh log_id than we required to read and this changelog is not empty log. + /// So we are missing something in our logs, but it's not dataloss, we will receive snapshot and required + /// entries fro leader. if (changelog_description.from_log_index > last_commited_log_index && (changelog_description.from_log_index - last_commited_log_index) > 1) { LOG_ERROR(log, "Some records was lost, last committed log index {}, smallest available log index on disk {}. Hopefully will receive missing records from leader.", last_commited_log_index, changelog_description.from_log_index); @@ -296,13 +307,19 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin break; } else if (changelog_description.from_log_index > start_to_read_from) + { + /// We don't have required amount of reserved logs, but nothing was lost. LOG_WARNING(log, "Don't have required amount of reserved log records. Need to read from {}, smallest available log index on disk {}.", start_to_read_from, changelog_description.from_log_index); + } } - started = true; ChangelogReader reader(changelog_description.path); result = reader.readChangelog(logs, start_to_read_from, index_to_start_pos, log); + + started = true; + + /// Otherwise we have already initialized it if (first_read_index == 0) first_read_index = result.first_read_index; @@ -319,10 +336,10 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin if (first_read_index != 0) start_index = first_read_index; - else + else /// We just may have no logs (only snapshot) start_index = last_commited_log_index; - if (incomplete_log_index != 0) + if (incomplete_log_index != -1) /// otherwise all logs completed so just start a new one { auto start_remove_from = existing_changelogs.begin(); if (started) @@ -340,6 +357,9 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin if (!existing_changelogs.empty()) { auto description = existing_changelogs.rbegin()->second; + if (description.expectedEntriesCountInLog() != rotate_interval) + LOG_TRACE(log, "Looks like rotate_logs_interval was changed, current {}, expected entries in last log {}", rotate_interval, description.expectedEntriesCountInLog()); + LOG_TRACE(log, "Continue to write into {}", description.path); current_writer = std::make_unique(description.path, WriteMode::Append, description.from_log_index); current_writer->setEntriesWritten(result.entries_read); @@ -378,7 +398,7 @@ void Changelog::rotate(uint64_t new_start_log_index) ChangelogRecord Changelog::buildRecord(uint64_t index, const LogEntryPtr & log_entry) { ChangelogRecord record; - record.header.version = ChangelogVersion::V0; + record.header.version = ChangelogVersion::V1; record.header.index = index; record.header.term = log_entry->get_term(); record.header.value_type = log_entry->get_val_type(); @@ -401,10 +421,13 @@ void Changelog::appendEntry(uint64_t index, const LogEntryPtr & log_entry) if (logs.empty()) start_index = index; - if (current_writer->getEntriesWritten() == rotate_interval) + const auto & current_changelog_description = existing_changelogs[current_writer->getStartIndex()]; + const bool log_is_complete = current_writer->getEntriesWritten() == current_changelog_description.expectedEntriesCountInLog(); + + if (log_is_complete) rotate(index); - auto offset = current_writer->appendRecord(buildRecord(index, log_entry)); + const auto offset = current_writer->appendRecord(buildRecord(index, log_entry)); if (!index_to_start_pos.try_emplace(index, offset).second) throw Exception(ErrorCodes::LOGICAL_ERROR, "Record with index {} already exists", index); @@ -416,26 +439,31 @@ void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry) if (index_to_start_pos.count(index) == 0) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot write at index {} because changelog doesn't contain it", index); - bool go_to_previous_file = index < current_writer->getStartIndex(); + /// This write_at require to overwrite everything in this file and also in previous file(s) + const bool go_to_previous_file = index < current_writer->getStartIndex(); + if (go_to_previous_file) { auto index_changelog = existing_changelogs.lower_bound(index); + ChangelogFileDescription description; - if (index_changelog->first == index) + + if (index_changelog->first == index) /// exactly this file starts from index description = index_changelog->second; else description = std::prev(index_changelog)->second; + /// Initialize writer from this log file current_writer = std::make_unique(description.path, WriteMode::Append, index_changelog->first); current_writer->setEntriesWritten(description.to_log_index - description.from_log_index + 1); } - auto entries_written = current_writer->getEntriesWritten(); + /// Truncate current file current_writer->truncateToLength(index_to_start_pos[index]); if (go_to_previous_file) { - /// Remove all subsequent files + /// Remove all subsequent files if overwritten something in previous one auto to_remove_itr = existing_changelogs.upper_bound(index); for (auto itr = to_remove_itr; itr != existing_changelogs.end();) { @@ -444,7 +472,9 @@ void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry) } } + auto entries_written = current_writer->getEntriesWritten(); /// Remove redundant logs from memory + /// Everything >= index must be removed for (uint64_t i = index; ; ++i) { auto log_itr = logs.find(i); @@ -454,9 +484,9 @@ void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry) index_to_start_pos.erase(i); entries_written--; } - current_writer->setEntriesWritten(entries_written); + /// Now we can actually override entry at index appendEntry(index, log_entry); } @@ -484,7 +514,7 @@ LogEntryPtr Changelog::getLastEntry() const { static LogEntryPtr fake_entry = nuraft::cs_new(0, nuraft::buffer::alloc(sizeof(uint64_t))); - uint64_t next_index = getNextEntryIndex() - 1; + const uint64_t next_index = getNextEntryIndex() - 1; auto entry = logs.find(next_index); if (entry == logs.end()) return fake_entry; diff --git a/src/Coordination/Changelog.h b/src/Coordination/Changelog.h index 893fe16abdf..44256ac6af6 100644 --- a/src/Coordination/Changelog.h +++ b/src/Coordination/Changelog.h @@ -53,6 +53,12 @@ struct ChangelogFileDescription uint64_t to_log_index; std::string path; + + /// How many entries should be stored in this log + uint64_t expectedEntriesCountInLog() const + { + return to_log_index - from_log_index + 1; + } }; class ChangelogWriter; @@ -128,10 +134,16 @@ private: const bool force_sync; Poco::Logger * log; + /// Currently existing changelogs std::map existing_changelogs; + + /// Current writer for changelog file std::unique_ptr current_writer; + /// Mapping log_id -> binary offset in log file IndexToOffset index_to_start_pos; + /// Mapping log_id -> log_entry IndexToLogEntry logs; + /// Start log_id which exists in all "active" logs uint64_t start_index = 0; }; diff --git a/src/Coordination/tests/gtest_for_build.cpp b/src/Coordination/tests/gtest_for_build.cpp index 9a744d2bbed..47eadbf9720 100644 --- a/src/Coordination/tests/gtest_for_build.cpp +++ b/src/Coordination/tests/gtest_for_build.cpp @@ -1299,6 +1299,82 @@ TEST(CoordinationTest, TestEphemeralNodeRemove) } +TEST(CoordinationTest, TestRotateIntervalChanges) +{ + using namespace Coordination; + ChangelogDirTest snapshots("./logs"); + { + DB::KeeperLogStore changelog("./logs", 100, true); + + changelog.init(0, 3); + for (size_t i = 1; i < 55; ++i) + { + std::shared_ptr request = std::make_shared(); + request->path = "/hello_" + std::to_string(i); + auto entry = getLogEntryFromZKRequest(0, 1, request); + changelog.append(entry); + changelog.end_of_append_batch(0, 0); + } + } + + EXPECT_TRUE(fs::exists("./logs/changelog_0_99.bin")); + + DB::KeeperLogStore changelog_1("./logs", 10, true); + changelog_1.init(0, 50); + for (size_t i = 0; i < 55; ++i) + { + std::shared_ptr request = std::make_shared(); + request->path = "/hello_" + std::to_string(100 + i); + auto entry = getLogEntryFromZKRequest(0, 1, request); + changelog_1.append(entry); + changelog_1.end_of_append_batch(0, 0); + } + + EXPECT_TRUE(fs::exists("./logs/changelog_0_99.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_100_109.bin")); + + DB::KeeperLogStore changelog_2("./logs", 7, true); + changelog_2.init(98, 55); + + for (size_t i = 0; i < 17; ++i) + { + std::shared_ptr request = std::make_shared(); + request->path = "/hello_" + std::to_string(200 + i); + auto entry = getLogEntryFromZKRequest(0, 1, request); + changelog_2.append(entry); + changelog_2.end_of_append_batch(0, 0); + } + + changelog_2.compact(105); + EXPECT_FALSE(fs::exists("./logs/changelog_0_99.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_100_109.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_110_116.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_117_123.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_124_130.bin")); + + DB::KeeperLogStore changelog_3("./logs", 5, true); + changelog_3.init(116, 3); + for (size_t i = 0; i < 17; ++i) + { + std::shared_ptr request = std::make_shared(); + request->path = "/hello_" + std::to_string(300 + i); + auto entry = getLogEntryFromZKRequest(0, 1, request); + changelog_3.append(entry); + changelog_3.end_of_append_batch(0, 0); + } + + changelog_3.compact(125); + EXPECT_FALSE(fs::exists("./logs/changelog_100_109.bin")); + EXPECT_FALSE(fs::exists("./logs/changelog_110_116.bin")); + EXPECT_FALSE(fs::exists("./logs/changelog_117_123.bin")); + + EXPECT_TRUE(fs::exists("./logs/changelog_124_130.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_131_135.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_136_140.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_141_145.bin")); +} + + int main(int argc, char ** argv) { Poco::AutoPtr channel(new Poco::ConsoleChannel(std::cerr)); From 96b78f83a9228606851cf0b235e92b3accc38c56 Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 26 Aug 2021 00:04:11 +0300 Subject: [PATCH 06/23] Fix typo --- src/Coordination/Changelog.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Coordination/Changelog.cpp b/src/Coordination/Changelog.cpp index 0d0fdaaa682..0a6e0c1a65e 100644 --- a/src/Coordination/Changelog.cpp +++ b/src/Coordination/Changelog.cpp @@ -299,7 +299,7 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin { /// Our first log starts from the more fresh log_id than we required to read and this changelog is not empty log. /// So we are missing something in our logs, but it's not dataloss, we will receive snapshot and required - /// entries fro leader. + /// entries from leader. if (changelog_description.from_log_index > last_commited_log_index && (changelog_description.from_log_index - last_commited_log_index) > 1) { LOG_ERROR(log, "Some records was lost, last committed log index {}, smallest available log index on disk {}. Hopefully will receive missing records from leader.", last_commited_log_index, changelog_description.from_log_index); From 64bc3285e9d2e175948f879fd1ce4bac195bab0b Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 26 Aug 2021 13:14:06 +0300 Subject: [PATCH 07/23] Fix merge with master --- src/Coordination/Changelog.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Coordination/Changelog.cpp b/src/Coordination/Changelog.cpp index 02328824c88..df5d8792b25 100644 --- a/src/Coordination/Changelog.cpp +++ b/src/Coordination/Changelog.cpp @@ -285,7 +285,7 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin /// Amount of entries in last log index uint64_t entries_in_last = 0; /// Log idx of the first incomplete log (key in existing_changelogs) - int64_t first_incomplete_log_start_index = 0; + int64_t first_incomplete_log_start_index = -1; /// if -1 then no incomplete log exists ChangelogReadResult result{}; /// First log index which was read from all changelogs @@ -460,7 +460,7 @@ void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot write at index {} because changelog doesn't contain it", index); /// This write_at require to overwrite everything in this file and also in previous file(s) -const bool go_to_previous_file = index < current_writer->getStartIndex(); + const bool go_to_previous_file = index < current_writer->getStartIndex(); if (go_to_previous_file) { From 0169fce78ee37d4d62c6b0ae24dcaaf6d081098a Mon Sep 17 00:00:00 2001 From: Amos Bird Date: Thu, 26 Aug 2021 19:01:15 +0800 Subject: [PATCH 08/23] Projection bug fixes and refactoring. --- programs/local/LocalServer.cpp | 3 + programs/server/Server.cpp | 6 +- src/Interpreters/InterpreterCreateQuery.cpp | 9 +- src/Interpreters/InterpreterCreateQuery.h | 2 +- .../MergeTreeBaseSelectProcessor.cpp | 4 +- src/Storages/MergeTree/MergeTreeData.cpp | 93 +++++++---- src/Storages/MergeTree/MergeTreeData.h | 3 + .../MergeTree/MergeTreeDataMergerMutator.cpp | 34 ++-- .../MergeTree/MergeTreeDataPartInMemory.cpp | 36 ++++ .../MergeTree/MergeTreeDataSelectExecutor.cpp | 14 +- .../MergeTree/MergeTreeDataWriter.cpp | 155 +++++++++++------- src/Storages/MergeTree/MergeTreeDataWriter.h | 28 +++- .../MergeTree/MergeTreeWriteAheadLog.cpp | 14 +- .../MergeTree/MergeTreeWriteAheadLog.h | 2 + .../MergeTree/MergedBlockOutputStream.h | 1 + src/Storages/ProjectionsDescription.cpp | 24 +++ src/Storages/ProjectionsDescription.h | 2 + src/Storages/SelectQueryInfo.h | 1 + src/Storages/StorageReplicatedMergeTree.cpp | 3 + src/Storages/System/StorageSystemParts.cpp | 11 +- 20 files changed, 312 insertions(+), 133 deletions(-) diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index b4496c121d5..2b1b6185321 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -271,6 +271,9 @@ try /// Load global settings from default_profile and system_profile. global_context->setDefaultProfiles(config()); + /// We load temporary database first, because projections need it. + DatabaseCatalog::instance().initializeAndLoadTemporaryDatabase(); + /** Init dummy default DB * NOTE: We force using isolated default database to avoid conflicts with default database from server environment * Otherwise, metadata of temporary File(format, EXPLICIT_PATH) tables will pollute metadata/ directory; diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index b7ef8cbec9c..ddbc4c4e433 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -1116,15 +1116,15 @@ if (ThreadFuzzer::instance().isEffective()) try { + auto & database_catalog = DatabaseCatalog::instance(); + /// We load temporary database first, because projections need it. + database_catalog.initializeAndLoadTemporaryDatabase(); loadMetadataSystem(global_context); /// After attaching system databases we can initialize system log. global_context->initializeSystemLogs(); global_context->setSystemZooKeeperLogAfterInitializationIfNeeded(); - auto & database_catalog = DatabaseCatalog::instance(); /// After the system database is created, attach virtual system tables (in addition to query_log and part_log) attachSystemTablesServer(*database_catalog.getSystemDatabase(), has_zookeeper); - /// We load temporary database first, because projections need it. - database_catalog.initializeAndLoadTemporaryDatabase(); /// Then, load remaining databases loadMetadata(global_context, default_database); database_catalog.loadDatabases(); diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp index 76cb6c783ba..a1313a84c36 100644 --- a/src/Interpreters/InterpreterCreateQuery.cpp +++ b/src/Interpreters/InterpreterCreateQuery.cpp @@ -550,7 +550,7 @@ ConstraintsDescription InterpreterCreateQuery::getConstraintsDescription(const A } -InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(ASTCreateQuery & create) const +InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTablePropertiesAndNormalizeCreateQuery(ASTCreateQuery & create) const { TableProperties properties; TableLockHolder as_storage_lock; @@ -589,10 +589,13 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(AS auto as_storage_metadata = as_storage->getInMemoryMetadataPtr(); properties.columns = as_storage_metadata->getColumns(); - /// Secondary indices make sense only for MergeTree family of storage engines. + /// Secondary indices and projections make sense only for MergeTree family of storage engines. /// We should not copy them for other storages. if (create.storage && endsWith(create.storage->engine->name, "MergeTree")) + { properties.indices = as_storage_metadata->getSecondaryIndices(); + properties.projections = as_storage_metadata->getProjections().clone(); + } properties.constraints = as_storage_metadata->getConstraints(); } @@ -910,7 +913,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create) } /// Set and retrieve list of columns, indices and constraints. Set table engine if needed. Rewrite query in canonical way. - TableProperties properties = setProperties(create); + TableProperties properties = getTablePropertiesAndNormalizeCreateQuery(create); DatabasePtr database; bool need_add_to_database = !create.temporary; diff --git a/src/Interpreters/InterpreterCreateQuery.h b/src/Interpreters/InterpreterCreateQuery.h index 7bd3ef25746..92f2929ea7b 100644 --- a/src/Interpreters/InterpreterCreateQuery.h +++ b/src/Interpreters/InterpreterCreateQuery.h @@ -74,7 +74,7 @@ private: BlockIO createTable(ASTCreateQuery & create); /// Calculate list of columns, constraints, indices, etc... of table. Rewrite query in canonical way. - TableProperties setProperties(ASTCreateQuery & create) const; + TableProperties getTablePropertiesAndNormalizeCreateQuery(ASTCreateQuery & create) const; void validateTableStructure(const ASTCreateQuery & create, const TableProperties & properties) const; void setEngine(ASTCreateQuery & create) const; AccessRightsElements getRequiredAccess() const; diff --git a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp index c91d60c5de7..2f46543b03c 100644 --- a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp @@ -287,7 +287,7 @@ static void injectVirtualColumnsImpl( { ColumnPtr column; if (rows) - column = DataTypeUUID().createColumnConst(rows, task->data_part->uuid)->convertToFullColumnIfConst(); + column = DataTypeUUID().createColumnConst(rows, part->uuid)->convertToFullColumnIfConst(); else column = DataTypeUUID().createColumn(); @@ -306,7 +306,7 @@ static void injectVirtualColumnsImpl( else if (virtual_column_name == "_partition_value") { if (rows) - inserter.insertPartitionValueColumn(rows, task->data_part->partition.value, partition_value_type, virtual_column_name); + inserter.insertPartitionValueColumn(rows, part->partition.value, partition_value_type, virtual_column_name); else inserter.insertPartitionValueColumn(rows, {}, partition_value_type, virtual_column_name); } diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 764f5d7adf7..bdb3471fb01 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -757,16 +757,20 @@ DataTypePtr MergeTreeData::getPartitionValueType() const } -Block MergeTreeData::getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool one_part) const +Block MergeTreeData::getSampleBlockWithVirtualColumns() const { DataTypePtr partition_value_type = getPartitionValueType(); - bool has_partition_value = typeid_cast(partition_value_type.get()); - Block block{ + return { ColumnWithTypeAndName(ColumnString::create(), std::make_shared(), "_part"), ColumnWithTypeAndName(ColumnString::create(), std::make_shared(), "_partition_id"), ColumnWithTypeAndName(ColumnUUID::create(), std::make_shared(), "_part_uuid"), ColumnWithTypeAndName(partition_value_type->createColumn(), partition_value_type, "_partition_value")}; +} + +Block MergeTreeData::getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool one_part) const +{ + auto block = getSampleBlockWithVirtualColumns(); MutableColumns columns = block.mutateColumns(); auto & part_column = columns[0]; @@ -774,6 +778,7 @@ Block MergeTreeData::getBlockWithVirtualPartColumns(const MergeTreeData::DataPar auto & part_uuid_column = columns[2]; auto & partition_value_column = columns[3]; + bool has_partition_value = typeid_cast(partition_value_column.get()); for (const auto & part_or_projection : parts) { const auto * part = part_or_projection->isProjectionPart() ? part_or_projection->getParentPart() : part_or_projection.get(); @@ -3465,7 +3470,7 @@ MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector( { for (const auto & part : range) { - for (const auto & [p_name, projection_part] : part->getProjectionParts()) + for (const auto & [_, projection_part] : part->getProjectionParts()) res.push_back(projection_part); } } @@ -4151,6 +4156,10 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( if (auto * select = query_ptr->as(); select && select->final()) return false; + // Currently projections don't support sampling yet. + if (settings.parallel_replicas_count > 1) + return false; + InterpreterSelectQuery select( query_ptr, query_context, SelectQueryOptions{QueryProcessingStage::WithMergeableState}.ignoreProjections().ignoreAlias()); const auto & analysis_result = select.getAnalysisResult(); @@ -4194,13 +4203,13 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( candidate.remove_where_filter = analysis_result.remove_where_filter; candidate.before_where = analysis_result.before_where->clone(); - required_columns = candidate.before_where->foldActionsByProjection( + auto new_required_columns = candidate.before_where->foldActionsByProjection( required_columns, projection.sample_block_for_keys, candidate.where_column_name); - - if (required_columns.empty()) + if (new_required_columns.empty() && !required_columns.empty()) return false; + required_columns = std::move(new_required_columns); candidate.before_where->addAggregatesViaProjection(aggregates); } @@ -4214,33 +4223,35 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( for (const auto & column : prewhere_actions->getResultColumns()) required_columns.erase(column.name); - // Prewhere_action should not add missing keys. - prewhere_required_columns = prewhere_actions->foldActionsByProjection( - prewhere_required_columns, projection.sample_block_for_keys, candidate.prewhere_info->prewhere_column_name, false); - - if (prewhere_required_columns.empty()) - return false; - candidate.prewhere_info->prewhere_actions = prewhere_actions; + { + // Prewhere_action should not add missing keys. + auto new_prewhere_required_columns = prewhere_actions->foldActionsByProjection( + prewhere_required_columns, projection.sample_block_for_keys, candidate.prewhere_info->prewhere_column_name, false); + if (new_prewhere_required_columns.empty() && !prewhere_required_columns.empty()) + return false; + prewhere_required_columns = std::move(new_prewhere_required_columns); + candidate.prewhere_info->prewhere_actions = prewhere_actions; + } if (candidate.prewhere_info->row_level_filter) { auto row_level_filter_actions = candidate.prewhere_info->row_level_filter->clone(); - prewhere_required_columns = row_level_filter_actions->foldActionsByProjection( + auto new_prewhere_required_columns = row_level_filter_actions->foldActionsByProjection( prewhere_required_columns, projection.sample_block_for_keys, candidate.prewhere_info->row_level_column_name, false); - - if (prewhere_required_columns.empty()) + if (new_prewhere_required_columns.empty() && !prewhere_required_columns.empty()) return false; + prewhere_required_columns = std::move(new_prewhere_required_columns); candidate.prewhere_info->row_level_filter = row_level_filter_actions; } if (candidate.prewhere_info->alias_actions) { auto alias_actions = candidate.prewhere_info->alias_actions->clone(); - prewhere_required_columns + auto new_prewhere_required_columns = alias_actions->foldActionsByProjection(prewhere_required_columns, projection.sample_block_for_keys, {}, false); - - if (prewhere_required_columns.empty()) + if (new_prewhere_required_columns.empty() && !prewhere_required_columns.empty()) return false; + prewhere_required_columns = std::move(new_prewhere_required_columns); candidate.prewhere_info->alias_actions = alias_actions; } required_columns.insert(prewhere_required_columns.begin(), prewhere_required_columns.end()); @@ -4259,11 +4270,20 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( return match; }; - for (const auto & projection : metadata_snapshot->projections) + auto virtual_block = getSampleBlockWithVirtualColumns(); + auto add_projection_candidate = [&](const ProjectionDescription & projection) { ProjectionCandidate candidate{}; candidate.desc = &projection; + auto sample_block = projection.sample_block; + auto sample_block_for_keys = projection.sample_block_for_keys; + for (const auto & column : virtual_block) + { + sample_block.insertUnique(column); + sample_block_for_keys.insertUnique(column); + } + if (projection.type == ProjectionDescription::Type::Aggregate && analysis_result.need_aggregate && can_use_aggregate_projection) { bool match = true; @@ -4271,7 +4291,7 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( // Let's first check if all aggregates are provided by current projection for (const auto & aggregate : select.getQueryAnalyzer()->aggregates()) { - const auto * column = projection.sample_block.findByName(aggregate.column_name); + const auto * column = sample_block.findByName(aggregate.column_name); if (column) { aggregates.insert(*column); @@ -4284,25 +4304,25 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( } if (!match) - continue; + return; // Check if all aggregation keys can be either provided by some action, or by current // projection directly. Reshape the `before_aggregation` action DAG so that it only - // needs to provide aggregation keys, and certain children DAG might be substituted by - // some keys in projection. + // needs to provide aggregation keys, and the DAG of certain child might be substituted + // by some keys in projection. candidate.before_aggregation = analysis_result.before_aggregation->clone(); - auto required_columns = candidate.before_aggregation->foldActionsByProjection(keys, projection.sample_block_for_keys); + auto required_columns = candidate.before_aggregation->foldActionsByProjection(keys, sample_block_for_keys); // TODO Let's find out the exact required_columns for keys. if (required_columns.empty() && (!keys.empty() && !candidate.before_aggregation->getRequiredColumns().empty())) - continue; + return; if (analysis_result.optimize_aggregation_in_order) { for (const auto & key : keys) { auto actions_dag = analysis_result.before_aggregation->clone(); - actions_dag->foldActionsByProjection({key}, projection.sample_block_for_keys); + actions_dag->foldActionsByProjection({key}, sample_block_for_keys); candidate.group_by_elements_actions.emplace_back(std::make_shared(actions_dag, actions_settings)); } } @@ -4311,7 +4331,7 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( candidate.before_aggregation->reorderAggregationKeysForProjection(key_name_pos_map); candidate.before_aggregation->addAggregatesViaProjection(aggregates); - if (rewrite_before_where(candidate, projection, required_columns, projection.sample_block_for_keys, aggregates)) + if (rewrite_before_where(candidate, projection, required_columns, sample_block_for_keys, aggregates)) { candidate.required_columns = {required_columns.begin(), required_columns.end()}; for (const auto & aggregate : aggregates) @@ -4328,13 +4348,16 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( for (const auto & column : actions->getRequiredColumns()) required_columns.insert(column.name); - if (rewrite_before_where(candidate, projection, required_columns, projection.sample_block, {})) + if (rewrite_before_where(candidate, projection, required_columns, sample_block, {})) { candidate.required_columns = {required_columns.begin(), required_columns.end()}; candidates.push_back(std::move(candidate)); } } - } + }; + + for (const auto & projection : metadata_snapshot->projections) + add_projection_candidate(projection); // Let's select the best projection to execute the query. if (!candidates.empty()) @@ -4409,6 +4432,14 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( if (!selected_candidate) return false; + else if (min_sum_marks == 0) + { + /// If selected_projection indicated an empty result set. Remember it in query_info but + /// don't use projection to run the query, because projection pipeline with empty result + /// set will not work correctly with empty_result_for_aggregation_by_empty_set. + query_info.merge_tree_empty_result = true; + return false; + } if (selected_candidate->desc->type == ProjectionDescription::Type::Aggregate) { diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 05d1b45a557..ef5f22ed096 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -795,6 +795,9 @@ public: /// Return the partition expression types as a Tuple type. Return DataTypeUInt8 if partition expression is empty. DataTypePtr getPartitionValueType() const; + /// Construct a sample block of virtual columns. + Block getSampleBlockWithVirtualColumns() const; + /// Construct a block consisting only of possible virtual columns for part pruning. /// If one_part is true, fill in at most one part. Block getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool one_part) const; diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index c2a0e5f0650..00a599af9c3 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -89,6 +89,7 @@ void FutureMergedMutatedPart::assign(MergeTreeData::DataPartsVector parts_) future_part_type = std::min(future_part_type, part->getType()); } + /// NOTE: We don't support merging into an in-memory part yet. auto chosen_type = parts_.front()->storage.choosePartTypeOnDisk(sum_bytes_uncompressed, sum_rows); future_part_type = std::min(future_part_type, chosen_type); assign(std::move(parts_), future_part_type); @@ -2014,10 +2015,19 @@ void MergeTreeDataMergerMutator::writeWithProjections( std::map projection_parts; Block block; std::vector projection_squashes; + const auto & settings = context->getSettingsRef(); for (size_t i = 0, size = projections_to_build.size(); i < size; ++i) { - projection_squashes.emplace_back(65536, 65536 * 256); + // If the parent part is an in-memory part, squash projection output into one block and + // build in-memory projection because we don't support merging into a new in-memory part. + // Otherwise we split the materialization into multiple stages similar to the process of + // INSERT SELECT query. + if (new_data_part->getType() == MergeTreeDataPartType::IN_MEMORY) + projection_squashes.emplace_back(0, 0); + else + projection_squashes.emplace_back(settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes); } + while (checkOperationIsNotCanceled(merge_entry) && (block = mutating_stream->read())) { if (minmax_idx) @@ -2028,26 +2038,10 @@ void MergeTreeDataMergerMutator::writeWithProjections( for (size_t i = 0, size = projections_to_build.size(); i < size; ++i) { const auto & projection = projections_to_build[i]->projection; - auto in = InterpreterSelectQuery( - projection.query_ast, - context, - Pipe(std::make_shared(block, Chunk(block.getColumns(), block.rows()))), - SelectQueryOptions{ - projection.type == ProjectionDescription::Type::Normal ? QueryProcessingStage::FetchColumns : QueryProcessingStage::WithMergeableState}) - .execute() - .getInputStream(); - in = std::make_shared(in, block.rows(), std::numeric_limits::max()); - in->readPrefix(); - auto & projection_squash = projection_squashes[i]; - auto projection_block = projection_squash.add(in->read()); - if (in->read()) - throw Exception("Projection cannot increase the number of rows in a block", ErrorCodes::LOGICAL_ERROR); - in->readSuffix(); + auto projection_block = projection_squashes[i].add(projection.calculate(block, context)); if (projection_block) - { - projection_parts[projection.name].emplace_back( - MergeTreeDataWriter::writeTempProjectionPart(data, log, projection_block, projection, new_data_part.get(), ++block_num)); - } + projection_parts[projection.name].emplace_back(MergeTreeDataWriter::writeTempProjectionPart( + data, log, projection_block, projection, new_data_part.get(), ++block_num)); } merge_entry->rows_written += block.rows(); diff --git a/src/Storages/MergeTree/MergeTreeDataPartInMemory.cpp b/src/Storages/MergeTree/MergeTreeDataPartInMemory.cpp index e929bfc6862..635da7e2ede 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartInMemory.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartInMemory.cpp @@ -94,6 +94,42 @@ void MergeTreeDataPartInMemory::flushToDisk(const String & base_path, const Stri MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns, indices, compression_codec); out.writePrefix(); out.write(block); + const auto & projections = metadata_snapshot->getProjections(); + for (const auto & [projection_name, projection] : projection_parts) + { + if (projections.has(projection_name)) + { + String projection_destination_path = fs::path(destination_path) / projection_name / ".proj"; + if (disk->exists(projection_destination_path)) + { + throw Exception( + ErrorCodes::DIRECTORY_ALREADY_EXISTS, + "Could not flush projection part {}. Projection part in {} already exists", + projection_name, + fullPath(disk, projection_destination_path)); + } + + auto projection_part = asInMemoryPart(projection); + auto projection_type = storage.choosePartTypeOnDisk(projection_part->block.bytes(), rows_count); + MergeTreePartInfo projection_info("all", 0, 0, 0); + auto projection_data_part + = storage.createPart(projection_name, projection_type, projection_info, volume, projection_name + ".proj", parent_part); + projection_data_part->is_temp = false; // clean up will be done on parent part + projection_data_part->setColumns(projection->getColumns()); + + disk->createDirectories(projection_destination_path); + const auto & desc = projections.get(name); + auto projection_compression_codec = storage.getContext()->chooseCompressionCodec(0, 0); + auto projection_indices = MergeTreeIndexFactory::instance().getMany(desc.metadata->getSecondaryIndices()); + MergedBlockOutputStream projection_out( + projection_data_part, desc.metadata, projection_part->columns, projection_indices, projection_compression_codec); + projection_out.writePrefix(); + projection_out.write(projection_part->block); + projection_out.writeSuffixAndFinalizePart(projection_data_part); + new_data_part->addProjectionPart(projection_name, std::move(projection_data_part)); + } + } + out.writeSuffixAndFinalizePart(new_data_part); } diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index f5c1890154a..004eaa6254c 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -132,6 +132,9 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( QueryProcessingStage::Enum processed_stage, std::shared_ptr max_block_numbers_to_read) const { + if (query_info.merge_tree_empty_result) + return std::make_unique(); + const auto & settings = context->getSettingsRef(); if (!query_info.projection) { @@ -181,7 +184,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( max_block_numbers_to_read, query_info.projection->merge_tree_projection_select_result_ptr); - if (plan) + if (plan->isInitialized()) { // If `before_where` is not empty, transform input blocks by adding needed columns // originated from key columns. We already project the block at the end, using @@ -237,7 +240,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( ordinary_query_plan.addStep(std::move(where_step)); } - ordinary_pipe = QueryPipeline::getPipe(interpreter.execute().pipeline); + ordinary_pipe = ordinary_query_plan.convertToPipe( + QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context)); } if (query_info.projection->desc->type == ProjectionDescription::Type::Aggregate) @@ -351,12 +355,14 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( pipes.emplace_back(std::move(projection_pipe)); pipes.emplace_back(std::move(ordinary_pipe)); auto pipe = Pipe::unitePipes(std::move(pipes)); - pipe.resize(1); + auto plan = std::make_unique(); + if (pipe.empty()) + return plan; + pipe.resize(1); auto step = std::make_unique( std::move(pipe), fmt::format("MergeTree(with {} projection {})", query_info.projection->desc->type, query_info.projection->desc->name)); - auto plan = std::make_unique(); plan->addStep(std::move(step)); return plan; } diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/src/Storages/MergeTree/MergeTreeDataWriter.cpp index 0b05650b42c..180c18ed1b5 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -386,31 +386,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart( sync_guard = disk->getDirectorySyncGuard(full_path); } - if (metadata_snapshot->hasProjections()) - { - for (const auto & projection : metadata_snapshot->getProjections()) - { - auto in = InterpreterSelectQuery( - projection.query_ast, - context, - Pipe(std::make_shared(block, Chunk(block.getColumns(), block.rows()))), - SelectQueryOptions{ - projection.type == ProjectionDescription::Type::Normal ? QueryProcessingStage::FetchColumns : QueryProcessingStage::WithMergeableState}) - .execute() - .getInputStream(); - in = std::make_shared(in, block.rows(), std::numeric_limits::max()); - in->readPrefix(); - auto projection_block = in->read(); - if (in->read()) - throw Exception("Projection cannot grow block rows", ErrorCodes::LOGICAL_ERROR); - in->readSuffix(); - if (projection_block.rows()) - { - new_data_part->addProjectionPart(projection.name, writeProjectionPart(projection_block, projection, new_data_part.get())); - } - } - } - if (metadata_snapshot->hasRowsTTL()) updateTTL(metadata_snapshot->getRowsTTL(), new_data_part->ttl_infos, new_data_part->ttl_infos.table_ttl, block, true); @@ -439,6 +414,14 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart( out.writePrefix(); out.writeWithPermutation(block, perm_ptr); + + for (const auto & projection : metadata_snapshot->getProjections()) + { + auto projection_block = projection.calculate(block, context); + if (projection_block.rows()) + new_data_part->addProjectionPart( + projection.name, writeProjectionPart(data, log, projection_block, projection, new_data_part.get())); + } out.writeSuffixAndFinalizePart(new_data_part, sync_on_insert); ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterRows, block.rows()); @@ -449,18 +432,28 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart( } MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeProjectionPartImpl( - MergeTreeData & data, + const String part_name, + MergeTreeDataPartType part_type, + const String & relative_path, + bool is_temp, + const IMergeTreeDataPart * parent_part, + const MergeTreeData & data, Poco::Logger * log, Block block, - const StorageMetadataPtr & metadata_snapshot, - MergeTreeData::MutableDataPartPtr && new_data_part) + const StorageMetadataPtr & metadata_snapshot) { + MergeTreePartInfo new_part_info("all", 0, 0, 0); + auto new_data_part = data.createPart( + part_name, + part_type, + new_part_info, + parent_part->volume, + relative_path, + parent_part); + new_data_part->is_temp = is_temp; + NamesAndTypesList columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames()); - MergeTreePartition partition{}; - IMergeTreeDataPart::MinMaxIndex minmax_idx{}; new_data_part->setColumns(columns); - new_data_part->partition = std::move(partition); - new_data_part->minmax_idx = std::move(minmax_idx); if (new_data_part->isStoredOnDisk()) { @@ -523,27 +516,41 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeProjectionPartImpl( ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterUncompressedBytes, block.bytes()); ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterCompressedBytes, new_data_part->getBytesOnDisk()); - return std::move(new_data_part); + return new_data_part; } -MergeTreeData::MutableDataPartPtr -MergeTreeDataWriter::writeProjectionPart(Block block, const ProjectionDescription & projection, const IMergeTreeDataPart * parent_part) +MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeProjectionPart( + MergeTreeData & data, Poco::Logger * log, Block block, const ProjectionDescription & projection, const IMergeTreeDataPart * parent_part) { - /// Size of part would not be greater than block.bytes() + epsilon - size_t expected_size = block.bytes(); - - // just check if there is enough space on parent volume - data.reserveSpace(expected_size, parent_part->volume); - String part_name = projection.name; - MergeTreePartInfo new_part_info("all", 0, 0, 0); - auto new_data_part = data.createPart( - part_name, data.choosePartType(expected_size, block.rows()), new_part_info, parent_part->volume, part_name + ".proj", parent_part); - new_data_part->is_temp = false; // clean up will be done on parent part + MergeTreeDataPartType part_type; + if (parent_part->getType() == MergeTreeDataPartType::IN_MEMORY) + { + part_type = MergeTreeDataPartType::IN_MEMORY; + } + else + { + /// Size of part would not be greater than block.bytes() + epsilon + size_t expected_size = block.bytes(); + // just check if there is enough space on parent volume + data.reserveSpace(expected_size, parent_part->volume); + part_type = data.choosePartTypeOnDisk(expected_size, block.rows()); + } - return writeProjectionPartImpl(data, log, block, projection.metadata, std::move(new_data_part)); + return writeProjectionPartImpl( + part_name, + part_type, + part_name + ".proj" /* relative_path */, + false /* is_temp */, + parent_part, + data, + log, + block, + projection.metadata); } +/// This is used for projection materialization process which may contain multiple stages of +/// projection part merges. MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempProjectionPart( MergeTreeData & data, Poco::Logger * log, @@ -552,24 +559,50 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempProjectionPart( const IMergeTreeDataPart * parent_part, size_t block_num) { - /// Size of part would not be greater than block.bytes() + epsilon - size_t expected_size = block.bytes(); - - // just check if there is enough space on parent volume - data.reserveSpace(expected_size, parent_part->volume); - String part_name = fmt::format("{}_{}", projection.name, block_num); - MergeTreePartInfo new_part_info("all", 0, 0, 0); - auto new_data_part = data.createPart( + MergeTreeDataPartType part_type; + if (parent_part->getType() == MergeTreeDataPartType::IN_MEMORY) + { + part_type = MergeTreeDataPartType::IN_MEMORY; + } + else + { + /// Size of part would not be greater than block.bytes() + epsilon + size_t expected_size = block.bytes(); + // just check if there is enough space on parent volume + data.reserveSpace(expected_size, parent_part->volume); + part_type = data.choosePartTypeOnDisk(expected_size, block.rows()); + } + + return writeProjectionPartImpl( part_name, - data.choosePartType(expected_size, block.rows()), - new_part_info, - parent_part->volume, - "tmp_insert_" + part_name + ".proj", - parent_part); - new_data_part->is_temp = true; // It's part for merge + part_type, + "tmp_insert_" + part_name + ".proj" /* relative_path */, + true /* is_temp */, + parent_part, + data, + log, + block, + projection.metadata); +} - return writeProjectionPartImpl(data, log, block, projection.metadata, std::move(new_data_part)); +MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeInMemoryProjectionPart( + const MergeTreeData & data, + Poco::Logger * log, + Block block, + const ProjectionDescription & projection, + const IMergeTreeDataPart * parent_part) +{ + return writeProjectionPartImpl( + projection.name, + MergeTreeDataPartType::IN_MEMORY, + projection.name + ".proj" /* relative_path */, + false /* is_temp */, + parent_part, + data, + log, + block, + projection.metadata); } } diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.h b/src/Storages/MergeTree/MergeTreeDataWriter.h index feb2f1e2b12..006f897c3e2 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.h +++ b/src/Storages/MergeTree/MergeTreeDataWriter.h @@ -49,9 +49,15 @@ public: MergeTreeData::MutableDataPartPtr writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, ContextPtr context); - MergeTreeData::MutableDataPartPtr writeProjectionPart( - Block block, const ProjectionDescription & projection, const IMergeTreeDataPart * parent_part); + /// For insertion. + static MergeTreeData::MutableDataPartPtr writeProjectionPart( + MergeTreeData & data, + Poco::Logger * log, + Block block, + const ProjectionDescription & projection, + const IMergeTreeDataPart * parent_part); + /// For mutation: MATERIALIZE PROJECTION. static MergeTreeData::MutableDataPartPtr writeTempProjectionPart( MergeTreeData & data, Poco::Logger * log, @@ -60,15 +66,27 @@ public: const IMergeTreeDataPart * parent_part, size_t block_num); + /// For WriteAheadLog AddPart. + static MergeTreeData::MutableDataPartPtr writeInMemoryProjectionPart( + const MergeTreeData & data, + Poco::Logger * log, + Block block, + const ProjectionDescription & projection, + const IMergeTreeDataPart * parent_part); + Block mergeBlock(const Block & block, SortDescription sort_description, Names & partition_key_columns, IColumn::Permutation *& permutation); private: static MergeTreeData::MutableDataPartPtr writeProjectionPartImpl( - MergeTreeData & data, + const String part_name, + MergeTreeDataPartType part_type, + const String & relative_path, + bool is_temp, + const IMergeTreeDataPart * parent_part, + const MergeTreeData & data, Poco::Logger * log, Block block, - const StorageMetadataPtr & metadata_snapshot, - MergeTreeData::MutableDataPartPtr && new_data_part); + const StorageMetadataPtr & metadata_snapshot); MergeTreeData & data; diff --git a/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp b/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp index 1fcd28b70e3..d8fb50a866c 100644 --- a/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp +++ b/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -31,6 +32,7 @@ MergeTreeWriteAheadLog::MergeTreeWriteAheadLog( , name(name_) , path(storage.getRelativeDataPath() + name_) , pool(storage.getContext()->getSchedulePool()) + , log(&Poco::Logger::get(storage.getLogName() + " (WriteAheadLog)")) { init(); sync_task = pool.createTask("MergeTreeWriteAheadLog::sync", [this] @@ -172,8 +174,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor || e.code() == ErrorCodes::BAD_DATA_PART_NAME || e.code() == ErrorCodes::CORRUPTED_DATA) { - LOG_WARNING(&Poco::Logger::get(storage.getLogName() + " (WriteAheadLog)"), - "WAL file '{}' is broken. {}", path, e.displayText()); + LOG_WARNING(log, "WAL file '{}' is broken. {}", path, e.displayText()); /// If file is broken, do not write new parts to it. /// But if it contains any part rotate and save them. @@ -203,6 +204,15 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor part_out.writePrefix(); part_out.write(block); + + for (const auto & projection : metadata_snapshot->getProjections()) + { + auto projection_block = projection.calculate(block, context); + if (projection_block.rows()) + part->addProjectionPart( + projection.name, + MergeTreeDataWriter::writeInMemoryProjectionPart(storage, log, projection_block, projection, part.get())); + } part_out.writeSuffixAndFinalizePart(part); min_block_number = std::min(min_block_number, part->info.min_block); diff --git a/src/Storages/MergeTree/MergeTreeWriteAheadLog.h b/src/Storages/MergeTree/MergeTreeWriteAheadLog.h index e01911aa8b8..8d1ea3c332e 100644 --- a/src/Storages/MergeTree/MergeTreeWriteAheadLog.h +++ b/src/Storages/MergeTree/MergeTreeWriteAheadLog.h @@ -91,6 +91,8 @@ private: bool sync_scheduled = false; mutable std::mutex write_mutex; + + Poco::Logger * log; }; } diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.h b/src/Storages/MergeTree/MergedBlockOutputStream.h index d04df598218..4c36508ebf5 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.h +++ b/src/Storages/MergeTree/MergedBlockOutputStream.h @@ -34,6 +34,7 @@ public: void writeSuffix() override; /// Finilize writing part and fill inner structures + /// If part is new and contains projections, they should be added before invoking this method. void writeSuffixAndFinalizePart( MergeTreeData::MutableDataPartPtr & new_part, bool sync = false, diff --git a/src/Storages/ProjectionsDescription.cpp b/src/Storages/ProjectionsDescription.cpp index dd48b23ecc3..5fc44bc044f 100644 --- a/src/Storages/ProjectionsDescription.cpp +++ b/src/Storages/ProjectionsDescription.cpp @@ -14,6 +14,7 @@ #include #include +#include namespace DB { @@ -23,6 +24,7 @@ namespace ErrorCodes extern const int NO_SUCH_PROJECTION_IN_TABLE; extern const int ILLEGAL_PROJECTION; extern const int NOT_IMPLEMENTED; + extern const int LOGICAL_ERROR; }; const char * ProjectionDescription::typeToString(Type type) @@ -192,6 +194,28 @@ void ProjectionDescription::recalculateWithNewColumns(const ColumnsDescription & *this = getProjectionFromAST(definition_ast, new_columns, query_context); } + +Block ProjectionDescription::calculate(const Block & block, ContextPtr context) const +{ + auto in = InterpreterSelectQuery( + query_ast, + context, + Pipe(std::make_shared(block, Chunk(block.getColumns(), block.rows()))), + SelectQueryOptions{ + type == ProjectionDescription::Type::Normal ? QueryProcessingStage::FetchColumns + : QueryProcessingStage::WithMergeableState}) + .execute() + .getInputStream(); + in = std::make_shared(in, block.rows(), 0); + in->readPrefix(); + auto ret = in->read(); + if (in->read()) + throw Exception("Projection cannot increase the number of rows in a block", ErrorCodes::LOGICAL_ERROR); + in->readSuffix(); + return ret; +} + + String ProjectionsDescription::toString() const { if (empty()) diff --git a/src/Storages/ProjectionsDescription.h b/src/Storages/ProjectionsDescription.h index fd505c4fe06..2b279c711fe 100644 --- a/src/Storages/ProjectionsDescription.h +++ b/src/Storages/ProjectionsDescription.h @@ -85,6 +85,8 @@ struct ProjectionDescription void recalculateWithNewColumns(const ColumnsDescription & new_columns, ContextPtr query_context); bool isPrimaryKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node) const; + + Block calculate(const Block & block, ContextPtr context) const; }; /// All projections in storage diff --git a/src/Storages/SelectQueryInfo.h b/src/Storages/SelectQueryInfo.h index a4536e1ff58..a2db6655223 100644 --- a/src/Storages/SelectQueryInfo.h +++ b/src/Storages/SelectQueryInfo.h @@ -163,6 +163,7 @@ struct SelectQueryInfo std::optional projection; bool ignore_projections = false; bool is_projection_query = false; + bool merge_tree_empty_result = false; MergeTreeDataSelectAnalysisResultPtr merge_tree_select_result_ptr; }; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index f76f0881438..4cf2042d724 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -7540,6 +7540,9 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP out.writePrefix(); out.write(block); + /// TODO(ab): What projections should we add to the empty part? How can we make sure that it + /// won't block future merges? Perhaps we should also check part emptiness when selecting parts + /// to merge. out.writeSuffixAndFinalizePart(new_data_part, sync_on_insert); try diff --git a/src/Storages/System/StorageSystemParts.cpp b/src/Storages/System/StorageSystemParts.cpp index 6a643dbe1b9..dba05d44969 100644 --- a/src/Storages/System/StorageSystemParts.cpp +++ b/src/Storages/System/StorageSystemParts.cpp @@ -75,7 +75,9 @@ StorageSystemParts::StorageSystemParts(const StorageID & table_id_) {"rows_where_ttl_info.expression", std::make_shared(std::make_shared())}, {"rows_where_ttl_info.min", std::make_shared(std::make_shared())}, - {"rows_where_ttl_info.max", std::make_shared(std::make_shared())} + {"rows_where_ttl_info.max", std::make_shared(std::make_shared())}, + + {"projections", std::make_shared(std::make_shared())}, } ) { @@ -253,6 +255,13 @@ void StorageSystemParts::processNextStorage( add_ttl_info_map(part->ttl_infos.group_by_ttl); add_ttl_info_map(part->ttl_infos.rows_where_ttl); + Array projections; + for (const auto & [name, _] : part->getProjectionParts()) + projections.push_back(name); + + if (columns_mask[src_index++]) + columns[res_index++]->insert(projections); + /// _state column should be the latest. /// Do not use part->getState*, it can be changed from different thread if (has_state_column) From ebfac8cfbbd9f620da9df88d1dd50521d599c46c Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 26 Aug 2021 14:50:08 +0300 Subject: [PATCH 09/23] More correct list watches semantics in ClickHouse Keeper --- src/Coordination/KeeperStorage.cpp | 44 +++++++++---- .../test_keeper_back_to_back/test.py | 66 ++++++++++++++++++- 2 files changed, 96 insertions(+), 14 deletions(-) diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 8bffdbe0222..3053ce17ad1 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -151,19 +151,39 @@ static KeeperStorage::ResponsesForSessions processWatchesImpl(const String & pat } auto parent_path = parentPath(path); - it = list_watches.find(parent_path); - if (it != list_watches.end()) - { - std::shared_ptr watch_list_response = std::make_shared(); - watch_list_response->path = parent_path; - watch_list_response->xid = Coordination::WATCH_XID; - watch_list_response->zxid = -1; - watch_list_response->type = Coordination::Event::CHILD; - watch_list_response->state = Coordination::State::CONNECTED; - for (auto watcher_session : it->second) - result.push_back(KeeperStorage::ResponseForSession{watcher_session, watch_list_response}); - list_watches.erase(it); + Strings paths_to_check_for_list_watches; + if (event_type == Coordination::Event::CREATED) + { + paths_to_check_for_list_watches.push_back(parent_path); /// Trigger list watches for parent + } + else if (event_type == Coordination::Event::DELETED) + { + paths_to_check_for_list_watches.push_back(path); /// Trigger both list watches for this path + paths_to_check_for_list_watches.push_back(parent_path); /// And for parent path + } + /// CHANGED event never trigger list wathes + + for (const auto & path_to_check : paths_to_check_for_list_watches) + { + it = list_watches.find(path_to_check); + if (it != list_watches.end()) + { + std::shared_ptr watch_list_response = std::make_shared(); + watch_list_response->path = path_to_check; + watch_list_response->xid = Coordination::WATCH_XID; + watch_list_response->zxid = -1; + if (path_to_check == parent_path) + watch_list_response->type = Coordination::Event::CHILD; + else + watch_list_response->type = Coordination::Event::DELETED; + + watch_list_response->state = Coordination::State::CONNECTED; + for (auto watcher_session : it->second) + result.push_back(KeeperStorage::ResponseForSession{watcher_session, watch_list_response}); + + list_watches.erase(it); + } } return result; } diff --git a/tests/integration/test_keeper_back_to_back/test.py b/tests/integration/test_keeper_back_to_back/test.py index 64f2f42d71e..f73b4671798 100644 --- a/tests/integration/test_keeper_back_to_back/test.py +++ b/tests/integration/test_keeper_back_to_back/test.py @@ -218,6 +218,10 @@ def test_watchers(started_cluster): print("Fake data", fake_data_watch_data) assert genuine_data_watch_data == fake_data_watch_data + + genuine_zk.create("/test_data_watches/child", b"a") + fake_zk.create("/test_data_watches/child", b"a") + genuine_children = None def genuine_child_callback(event): print("Genuine child watch called") @@ -233,16 +237,74 @@ def test_watchers(started_cluster): genuine_zk.get_children("/test_data_watches", watch=genuine_child_callback) fake_zk.get_children("/test_data_watches", watch=fake_child_callback) + print("Calling non related genuine child") + genuine_zk.set("/test_data_watches/child", b"q") + genuine_zk.set("/test_data_watches", b"q") + + print("Calling non related fake child") + fake_zk.set("/test_data_watches/child", b"q") + fake_zk.set("/test_data_watches", b"q") + + time.sleep(3) + + assert genuine_children == None + assert fake_children == None + print("Calling genuine child") - genuine_zk.create("/test_data_watches/child", b"b") + genuine_zk.create("/test_data_watches/child_new", b"b") print("Calling fake child") - fake_zk.create("/test_data_watches/child", b"b") + fake_zk.create("/test_data_watches/child_new", b"b") time.sleep(3) print("Genuine children", genuine_children) print("Fake children", fake_children) assert genuine_children == fake_children + + genuine_children_delete = None + def genuine_child_delete_callback(event): + print("Genuine child watch called") + nonlocal genuine_children_delete + genuine_children_delete = event + + fake_children_delete = None + def fake_child_delete_callback(event): + print("Fake child watch called") + nonlocal fake_children_delete + fake_children_delete = event + + genuine_child_delete = None + def genuine_own_delete_callback(event): + print("Genuine child watch called") + nonlocal genuine_child_delete + genuine_child_delete = event + + fake_child_delete = None + def fake_own_delete_callback(event): + print("Fake child watch called") + nonlocal fake_child_delete + fake_child_delete = event + + genuine_zk.get_children("/test_data_watches", watch=genuine_child_delete_callback) + fake_zk.get_children("/test_data_watches", watch=fake_child_delete_callback) + genuine_zk.get_children("/test_data_watches/child", watch=genuine_own_delete_callback) + fake_zk.get_children("/test_data_watches/child", watch=fake_own_delete_callback) + + print("Calling genuine child delete") + genuine_zk.delete("/test_data_watches/child") + print("Calling fake child delete") + fake_zk.delete("/test_data_watches/child") + + time.sleep(3) + + print("Genuine children delete", genuine_children_delete) + print("Fake children delete", fake_children_delete) + assert genuine_children_delete == fake_children_delete + + print("Genuine child delete", genuine_child_delete) + print("Fake child delete", fake_child_delete) + assert genuine_child_delete == fake_child_delete + finally: for zk in [genuine_zk, fake_zk]: stop_zk(zk) From 23325c3fa6f1d16ccb6ae420ef72589c647d34f9 Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 26 Aug 2021 19:00:41 +0300 Subject: [PATCH 10/23] Fix rare case when watch response received before request response --- src/Coordination/KeeperStateMachine.cpp | 46 ++++++++++--------------- src/Coordination/KeeperStateMachine.h | 8 +++-- 2 files changed, 25 insertions(+), 29 deletions(-) diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index 2e5e7214e3e..ffbac0656b9 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -106,31 +106,26 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n { const Coordination::ZooKeeperSessionIDRequest & session_id_request = dynamic_cast(*request_for_session.request); int64_t session_id; - { - std::lock_guard lock(storage_lock); - session_id = storage->getSessionID(session_id_request.session_timeout_ms); - } - LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms); - std::shared_ptr response = std::make_shared(); response->internal_id = session_id_request.internal_id; - response->session_id = session_id; response->server_id = session_id_request.server_id; - KeeperStorage::ResponseForSession response_for_session; response_for_session.session_id = -1; response_for_session.response = response; - responses_queue.push(response_for_session); + { + std::lock_guard lock(storage_and_responses_lock); + session_id = storage->getSessionID(session_id_request.session_timeout_ms); + LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms); + response->session_id = session_id; + responses_queue.push(response_for_session); + } } else { - KeeperStorage::ResponsesForSessions responses_for_sessions; - { - std::lock_guard lock(storage_lock); - responses_for_sessions = storage->processRequest(request_for_session.request, request_for_session.session_id, log_idx); - for (auto & response_for_session : responses_for_sessions) - responses_queue.push(response_for_session); - } + std::lock_guard lock(storage_and_responses_lock); + KeeperStorage::ResponsesForSessions responses_for_sessions = storage->processRequest(request_for_session.request, request_for_session.session_id, log_idx); + for (auto & response_for_session : responses_for_sessions) + responses_queue.push(response_for_session); } last_committed_idx = log_idx; @@ -150,7 +145,7 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s) } { /// deserialize and apply snapshot to storage - std::lock_guard lock(storage_lock); + std::lock_guard lock(storage_and_responses_lock); std::tie(latest_snapshot_meta, storage) = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_ptr); } @@ -175,7 +170,7 @@ void KeeperStateMachine::create_snapshot( auto snapshot_meta_copy = nuraft::snapshot::deserialize(*snp_buf); CreateSnapshotTask snapshot_task; { /// lock storage for a short period time to turn on "snapshot mode". After that we can read consistent storage state without locking. - std::lock_guard lock(storage_lock); + std::lock_guard lock(storage_and_responses_lock); snapshot_task.snapshot = std::make_shared(storage.get(), snapshot_meta_copy); } @@ -198,7 +193,7 @@ void KeeperStateMachine::create_snapshot( { /// Must do it with lock (clearing elements from list) - std::lock_guard lock(storage_lock); + std::lock_guard lock(storage_and_responses_lock); /// Turn off "snapshot mode" and clear outdate part of storage state storage->clearGarbageAfterSnapshot(); /// Destroy snapshot with lock @@ -236,7 +231,7 @@ void KeeperStateMachine::save_logical_snp_obj( nuraft::ptr cloned_meta; if (obj_id == 0) /// Fake snapshot required by NuRaft at startup { - std::lock_guard lock(storage_lock); + std::lock_guard lock(storage_and_responses_lock); KeeperStorageSnapshot snapshot(storage.get(), s.get_last_log_idx()); cloned_buffer = snapshot_manager.serializeSnapshotToBuffer(snapshot); } @@ -303,24 +298,21 @@ int KeeperStateMachine::read_logical_snp_obj( void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSession & request_for_session) { /// Pure local request, just process it with storage - KeeperStorage::ResponsesForSessions responses; - { - std::lock_guard lock(storage_lock); - responses = storage->processRequest(request_for_session.request, request_for_session.session_id, std::nullopt); - } + std::lock_guard lock(storage_and_responses_lock); + auto responses = storage->processRequest(request_for_session.request, request_for_session.session_id, std::nullopt); for (const auto & response : responses) responses_queue.push(response); } std::unordered_set KeeperStateMachine::getDeadSessions() { - std::lock_guard lock(storage_lock); + std::lock_guard lock(storage_and_responses_lock); return storage->getDeadSessions(); } void KeeperStateMachine::shutdownStorage() { - std::lock_guard lock(storage_lock); + std::lock_guard lock(storage_and_responses_lock); storage->finalize(); } diff --git a/src/Coordination/KeeperStateMachine.h b/src/Coordination/KeeperStateMachine.h index 06be270b66e..32beaaf69e6 100644 --- a/src/Coordination/KeeperStateMachine.h +++ b/src/Coordination/KeeperStateMachine.h @@ -99,8 +99,12 @@ private: /// Mutex for snapshots std::mutex snapshots_lock; - /// Lock for storage - std::mutex storage_lock; + /// Lock for storage and responses_queue. It's important to process requests + /// and push them to the responses queue while holding this lock. Otherwise + /// we can get strange cases when, for example client send read request with + /// watch and after that receive watch response and only receive response + /// for request. + std::mutex storage_and_responses_lock; /// Last committed Raft log number. std::atomic last_committed_idx; From 9c028be1e409b88a8dd75cc8922bd5d02fc22cb8 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 26 Aug 2021 21:19:12 +0300 Subject: [PATCH 11/23] Fix insertion of fields with Infinity values in nullable low cardinality columns. --- src/Columns/ColumnUnique.h | 2 +- .../0_stateless/2013_lc_nullable_and_infinity.reference | 4 ++++ tests/queries/0_stateless/2013_lc_nullable_and_infinity.sql | 3 +++ 3 files changed, 8 insertions(+), 1 deletion(-) create mode 100644 tests/queries/0_stateless/2013_lc_nullable_and_infinity.reference create mode 100644 tests/queries/0_stateless/2013_lc_nullable_and_infinity.sql diff --git a/src/Columns/ColumnUnique.h b/src/Columns/ColumnUnique.h index bfa80b5e3b2..72904c5ab8f 100644 --- a/src/Columns/ColumnUnique.h +++ b/src/Columns/ColumnUnique.h @@ -301,7 +301,7 @@ size_t ColumnUnique::getNullValueIndex() const template size_t ColumnUnique::uniqueInsert(const Field & x) { - if (x.getType() == Field::Types::Null) + if (x.isNull()) return getNullValueIndex(); if (valuesHaveFixedSize()) diff --git a/tests/queries/0_stateless/2013_lc_nullable_and_infinity.reference b/tests/queries/0_stateless/2013_lc_nullable_and_infinity.reference new file mode 100644 index 00000000000..ef5038b2236 --- /dev/null +++ b/tests/queries/0_stateless/2013_lc_nullable_and_infinity.reference @@ -0,0 +1,4 @@ +0 \N + +0 \N +0 \N diff --git a/tests/queries/0_stateless/2013_lc_nullable_and_infinity.sql b/tests/queries/0_stateless/2013_lc_nullable_and_infinity.sql new file mode 100644 index 00000000000..c1c8a9c00b1 --- /dev/null +++ b/tests/queries/0_stateless/2013_lc_nullable_and_infinity.sql @@ -0,0 +1,3 @@ +set receive_timeout = '10', receive_data_timeout_ms = '10000', extremes = '1', allow_suspicious_low_cardinality_types = '1', force_primary_key = '1', join_use_nulls = '1', max_rows_to_read = '1', join_algorithm = 'partial_merge'; + +SELECT * FROM (SELECT dummy AS val FROM system.one) AS s1 ANY LEFT JOIN (SELECT toLowCardinality(dummy) AS rval FROM system.one) AS s2 ON (val + 9223372036854775806) = (rval * 1); From 5bb0a91f83141e02b3fa948150b04b18d929ba23 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 27 Aug 2021 00:41:06 +0300 Subject: [PATCH 12/23] Fix system.zookeeper_log.address (Before it always contains "::") --- src/Interpreters/ZooKeeperLog.cpp | 2 +- .../queries/0_stateless/01158_zookeeper_log.reference | 10 +++++----- tests/queries/0_stateless/01158_zookeeper_log.sql | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/Interpreters/ZooKeeperLog.cpp b/src/Interpreters/ZooKeeperLog.cpp index bc187876d24..39bd9a75f3e 100644 --- a/src/Interpreters/ZooKeeperLog.cpp +++ b/src/Interpreters/ZooKeeperLog.cpp @@ -158,7 +158,7 @@ void ZooKeeperLogElement::appendToBlock(MutableColumns & columns) const auto event_time_seconds = event_time / 1000000; columns[i++]->insert(DateLUT::instance().toDayNum(event_time_seconds).toUnderType()); columns[i++]->insert(event_time); - columns[i++]->insert(IPv6ToBinary(address.host()).data()); + columns[i++]->insertData(IPv6ToBinary(address.host()).data(), 16); columns[i++]->insert(address.port()); columns[i++]->insert(session_id); diff --git a/tests/queries/0_stateless/01158_zookeeper_log.reference b/tests/queries/0_stateless/01158_zookeeper_log.reference index 35a30ee04e3..44451d82783 100644 --- a/tests/queries/0_stateless/01158_zookeeper_log.reference +++ b/tests/queries/0_stateless/01158_zookeeper_log.reference @@ -1,9 +1,9 @@ log -Response 0 Watch /test/01158/default/rmt/log 0 0 \N 0 0 ZOK CHILD CONNECTED 0 0 0 0 -Request 0 Create /test/01158/default/rmt/log 0 0 \N 0 4 \N \N \N 0 0 0 0 -Response 0 Create /test/01158/default/rmt/log 0 0 \N 0 4 ZOK \N \N /test/01158/default/rmt/log 0 0 0 0 -Request 0 Create /test/01158/default/rmt/log/log- 0 1 \N 0 1 \N \N \N 0 0 0 0 -Response 0 Create /test/01158/default/rmt/log/log- 0 1 \N 0 1 ZOK \N \N /test/01158/default/rmt/log/log-0000000000 0 0 0 0 +::1 Response 0 Watch /test/01158/default/rmt/log 0 0 \N 0 0 ZOK CHILD CONNECTED 0 0 0 0 +::1 Request 0 Create /test/01158/default/rmt/log 0 0 \N 0 4 \N \N \N 0 0 0 0 +::1 Response 0 Create /test/01158/default/rmt/log 0 0 \N 0 4 ZOK \N \N /test/01158/default/rmt/log 0 0 0 0 +::1 Request 0 Create /test/01158/default/rmt/log/log- 0 1 \N 0 1 \N \N \N 0 0 0 0 +::1 Response 0 Create /test/01158/default/rmt/log/log- 0 1 \N 0 1 ZOK \N \N /test/01158/default/rmt/log/log-0000000000 0 0 0 0 parts Request 0 Multi 0 0 \N 5 0 \N \N \N 0 0 0 0 Request 0 Create /test/01158/default/rmt/log/log- 0 1 \N 0 1 \N \N \N 0 0 0 0 diff --git a/tests/queries/0_stateless/01158_zookeeper_log.sql b/tests/queries/0_stateless/01158_zookeeper_log.sql index f3f1980b5a2..608dd8294de 100644 --- a/tests/queries/0_stateless/01158_zookeeper_log.sql +++ b/tests/queries/0_stateless/01158_zookeeper_log.sql @@ -6,7 +6,7 @@ insert into rmt values (1); system flush logs; select 'log'; -select type, has_watch, op_num, path, is_ephemeral, is_sequential, version, requests_size, request_idx, error, watch_type, +select address, type, has_watch, op_num, path, is_ephemeral, is_sequential, version, requests_size, request_idx, error, watch_type, watch_state, path_created, stat_version, stat_cversion, stat_dataLength, stat_numChildren from system.zookeeper_log where path like '/test/01158/' || currentDatabase() || '/rmt/log%' and op_num not in (3, 4, 12) order by xid, type, request_idx; From a63ef6b8eb9d259b0dbb655ae2a9cba525db87e8 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 27 Aug 2021 00:41:06 +0300 Subject: [PATCH 13/23] Avoid excessive getpeername(2) calls for zookeeper_log --- src/Common/ZooKeeper/ZooKeeperImpl.cpp | 3 ++- src/Common/ZooKeeper/ZooKeeperImpl.h | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/src/Common/ZooKeeper/ZooKeeperImpl.cpp index 5f15a3b8b75..21af384badc 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -387,6 +387,7 @@ void ZooKeeper::connect( } socket.connect(node.address, connection_timeout); + socket_address = socket.peerAddress(); socket.setReceiveTimeout(operation_timeout); socket.setSendTimeout(operation_timeout); @@ -1255,7 +1256,7 @@ void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr & request, const { elem.type = log_type; elem.event_time = event_time; - elem.address = socket.peerAddress(); + elem.address = socket_address; elem.session_id = session_id; maybe_zk_log->add(elem); } diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.h b/src/Common/ZooKeeper/ZooKeeperImpl.h index 8f0f64ceafa..4361b93e538 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.h +++ b/src/Common/ZooKeeper/ZooKeeperImpl.h @@ -199,6 +199,8 @@ private: Poco::Timespan operation_timeout; Poco::Net::StreamSocket socket; + /// To avoid excessive getpeername(2) calls. + Poco::Net::SocketAddress socket_address; std::optional in; std::optional out; From 91621cfcd2bf83e71931697ae1de9a1b3abdfa14 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 27 Aug 2021 00:43:21 +0300 Subject: [PATCH 14/23] Fix reading of custom TLD w/o new line at EOF Fixes: #28177 --- src/Common/TLDListsHolder.cpp | 3 ++- tests/config/config.d/top_level_domains_lists.xml | 1 + tests/config/top_level_domains/no_new_line_list.dat | 1 + tests/queries/0_stateless/01601_custom_tld.reference | 4 ++++ tests/queries/0_stateless/01601_custom_tld.sql | 5 +++++ 5 files changed, 13 insertions(+), 1 deletion(-) create mode 100644 tests/config/top_level_domains/no_new_line_list.dat diff --git a/src/Common/TLDListsHolder.cpp b/src/Common/TLDListsHolder.cpp index 34bef8248b5..db0a762f826 100644 --- a/src/Common/TLDListsHolder.cpp +++ b/src/Common/TLDListsHolder.cpp @@ -64,7 +64,8 @@ size_t TLDListsHolder::parseAndAddTldList(const std::string & name, const std::s while (!in.eof()) { readEscapedStringUntilEOL(line, in); - ++in.position(); + if (!in.eof()) + ++in.position(); /// Skip comments if (line.size() > 2 && line[0] == '/' && line[1] == '/') continue; diff --git a/tests/config/config.d/top_level_domains_lists.xml b/tests/config/config.d/top_level_domains_lists.xml index 7b5e6a5638a..a10cbae1b43 100644 --- a/tests/config/config.d/top_level_domains_lists.xml +++ b/tests/config/config.d/top_level_domains_lists.xml @@ -1,5 +1,6 @@ public_suffix_list.dat + no_new_line_list.dat diff --git a/tests/config/top_level_domains/no_new_line_list.dat b/tests/config/top_level_domains/no_new_line_list.dat new file mode 100644 index 00000000000..4d5f9756e55 --- /dev/null +++ b/tests/config/top_level_domains/no_new_line_list.dat @@ -0,0 +1 @@ +foo.bar \ No newline at end of file diff --git a/tests/queries/0_stateless/01601_custom_tld.reference b/tests/queries/0_stateless/01601_custom_tld.reference index 04204ebf02a..ee326a77834 100644 --- a/tests/queries/0_stateless/01601_custom_tld.reference +++ b/tests/queries/0_stateless/01601_custom_tld.reference @@ -28,3 +28,7 @@ foo -- vector xx.blogspot.co.at +-- no new line +foo.bar +a.foo.bar +foo.baz diff --git a/tests/queries/0_stateless/01601_custom_tld.sql b/tests/queries/0_stateless/01601_custom_tld.sql index ceb00d5ff19..92ce28828f8 100644 --- a/tests/queries/0_stateless/01601_custom_tld.sql +++ b/tests/queries/0_stateless/01601_custom_tld.sql @@ -37,3 +37,8 @@ select cutToFirstSignificantSubdomainCustom('http://www.foo', 'public_suffix_lis select '-- vector'; select cutToFirstSignificantSubdomainCustom('http://xx.blogspot.co.at/' || toString(number), 'public_suffix_list') from numbers(1); select cutToFirstSignificantSubdomainCustom('there-is-no-such-domain' || toString(number), 'public_suffix_list') from numbers(1); + +select '-- no new line'; +select cutToFirstSignificantSubdomainCustom('foo.bar', 'no_new_line_list'); +select cutToFirstSignificantSubdomainCustom('a.foo.bar', 'no_new_line_list'); +select cutToFirstSignificantSubdomainCustom('a.foo.baz', 'no_new_line_list'); From 124e2526818636f45ad1d7439d3e2dc4abf4ce84 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 27 Aug 2021 09:39:08 +0300 Subject: [PATCH 15/23] Avoid 01158_zookeeper_log depends on cleanup_delay_period Like in [1]. [1]: https://clickhouse-test-reports.s3.yandex.net/28212/a63ef6b8eb9d259b0dbb655ae2a9cba525db87e8/functional_stateless_tests_flaky_check_(address).html#fail1 --- tests/queries/0_stateless/01158_zookeeper_log.sql | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/queries/0_stateless/01158_zookeeper_log.sql b/tests/queries/0_stateless/01158_zookeeper_log.sql index 608dd8294de..8f4a9f04f9b 100644 --- a/tests/queries/0_stateless/01158_zookeeper_log.sql +++ b/tests/queries/0_stateless/01158_zookeeper_log.sql @@ -1,5 +1,7 @@ drop table if exists rmt; -create table rmt (n int) engine=ReplicatedMergeTree('/test/01158/{database}/rmt', '1') order by n; +-- cleanup code will perform extra Exists +-- (so the .reference will not match) +create table rmt (n int) engine=ReplicatedMergeTree('/test/01158/{database}/rmt', '1') order by n settings cleanup_delay_period=86400; system sync replica rmt; insert into rmt values (1); insert into rmt values (1); From 44db6e29987868d197541604549efb3ae1bf6597 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 27 Aug 2021 09:39:41 +0300 Subject: [PATCH 16/23] Mark 01158_zookeeper_log as long According to one of flaky check (address) runs [1] it is possible for this test to run out of 60 seconds. [1]: https://clickhouse-test-reports.s3.yandex.net/28212/a63ef6b8eb9d259b0dbb655ae2a9cba525db87e8/functional_stateless_tests_flaky_check_(address).html#fail1 --- ...zookeeper_log.reference => 01158_zookeeper_log_long.reference} | 0 .../{01158_zookeeper_log.sql => 01158_zookeeper_log_long.sql} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename tests/queries/0_stateless/{01158_zookeeper_log.reference => 01158_zookeeper_log_long.reference} (100%) rename tests/queries/0_stateless/{01158_zookeeper_log.sql => 01158_zookeeper_log_long.sql} (100%) diff --git a/tests/queries/0_stateless/01158_zookeeper_log.reference b/tests/queries/0_stateless/01158_zookeeper_log_long.reference similarity index 100% rename from tests/queries/0_stateless/01158_zookeeper_log.reference rename to tests/queries/0_stateless/01158_zookeeper_log_long.reference diff --git a/tests/queries/0_stateless/01158_zookeeper_log.sql b/tests/queries/0_stateless/01158_zookeeper_log_long.sql similarity index 100% rename from tests/queries/0_stateless/01158_zookeeper_log.sql rename to tests/queries/0_stateless/01158_zookeeper_log_long.sql From a90c6fdba5a1d31c014b70b38e80efeae79de4f9 Mon Sep 17 00:00:00 2001 From: kssenii Date: Fri, 27 Aug 2021 11:57:23 +0300 Subject: [PATCH 17/23] Fix --- src/Common/ErrorCodes.cpp | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 084a0e3e93b..2110a8c7d6d 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -577,14 +577,13 @@ M(606, BACKUP_IS_EMPTY) \ M(607, BACKUP_ELEMENT_DUPLICATE) \ M(608, CANNOT_RESTORE_TABLE) \ + M(609, FUNCTION_ALREADY_EXISTS) \ + M(610, CANNOT_DROP_SYSTEM_FUNCTION) \ + M(611, CANNOT_CREATE_RECURSIVE_FUNCTION) \ + M(612, OBJECT_ALREADY_STORED_ON_DISK) \ + M(613, OBJECT_WAS_NOT_STORED_ON_DISK) \ + M(614, POSTGRESQL_CONNECTION_FAILURE) \ \ - M(598, FUNCTION_ALREADY_EXISTS) \ - M(599, CANNOT_DROP_SYSTEM_FUNCTION) \ - M(600, CANNOT_CREATE_RECURSIVE_FUNCTION) \ - M(601, OBJECT_ALREADY_STORED_ON_DISK) \ - M(602, OBJECT_WAS_NOT_STORED_ON_DISK) \ - \ - M(998, POSTGRESQL_CONNECTION_FAILURE) \ M(999, KEEPER_EXCEPTION) \ M(1000, POCO_EXCEPTION) \ M(1001, STD_EXCEPTION) \ From 7264efc053ff9d603e70be3f1ce53f3558e478bd Mon Sep 17 00:00:00 2001 From: Aimiyoo Date: Fri, 27 Aug 2021 17:54:15 +0800 Subject: [PATCH 18/23] Update tutorial.md (#28235) --- docs/zh/getting-started/tutorial.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/getting-started/tutorial.md b/docs/zh/getting-started/tutorial.md index 4b1427cad2a..902064936f5 100644 --- a/docs/zh/getting-started/tutorial.md +++ b/docs/zh/getting-started/tutorial.md @@ -520,7 +520,7 @@ WHERE (CounterID = 912887) AND (toYYYYMM(StartDate) = 201403) AND (domain(StartU ClickHouse集群是一个同质集群。 设置步骤: 1. 在群集的所有机器上安装ClickHouse服务端 -2. 在配置文件中设置群集配置 +2. 在配置文件中设置集群配置 3. 在每个实例上创建本地表 4. 创建一个[分布式表](../engines/table-engines/special/distributed.md) From 0e169b42657ce14ca8395081177b9b694db1884d Mon Sep 17 00:00:00 2001 From: Maksim Kita Date: Fri, 27 Aug 2021 13:46:59 +0300 Subject: [PATCH 19/23] Dictionaries small fixes --- src/Common/filesystemHelpers.h | 1 + src/Dictionaries/FileDictionarySource.cpp | 5 +++-- src/Dictionaries/LibraryDictionarySource.cpp | 7 ++++--- src/Dictionaries/registerCacheDictionaries.cpp | 5 +++-- 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/Common/filesystemHelpers.h b/src/Common/filesystemHelpers.h index 71ef7844ef7..551b4ee0fc5 100644 --- a/src/Common/filesystemHelpers.h +++ b/src/Common/filesystemHelpers.h @@ -35,6 +35,7 @@ bool pathStartsWith(const std::filesystem::path & path, const std::filesystem::p /// Returns true if path starts with prefix path bool pathStartsWith(const String & path, const String & prefix_path); +/// Returns true if symlink starts with prefix path bool symlinkStartsWith(const String & path, const String & prefix_path); } diff --git a/src/Dictionaries/FileDictionarySource.cpp b/src/Dictionaries/FileDictionarySource.cpp index 54ce5e4a448..5a77dc02673 100644 --- a/src/Dictionaries/FileDictionarySource.cpp +++ b/src/Dictionaries/FileDictionarySource.cpp @@ -32,8 +32,9 @@ FileDictionarySource::FileDictionarySource( , sample_block{sample_block_} , context(context_) { - if (created_from_ddl && !pathStartsWith(filepath, context->getUserFilesPath())) - throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "File path {} is not inside {}", filepath, context->getUserFilesPath()); + auto user_files_path = context->getUserFilesPath(); + if (created_from_ddl && !pathStartsWith(filepath, user_files_path)) + throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "File path {} is not inside {}", filepath, user_files_path); } diff --git a/src/Dictionaries/LibraryDictionarySource.cpp b/src/Dictionaries/LibraryDictionarySource.cpp index 2a47d4c9172..74da196bc73 100644 --- a/src/Dictionaries/LibraryDictionarySource.cpp +++ b/src/Dictionaries/LibraryDictionarySource.cpp @@ -41,14 +41,15 @@ LibraryDictionarySource::LibraryDictionarySource( , sample_block{sample_block_} , context(Context::createCopy(context_)) { + auto dictionaries_lib_path = context->getDictionariesLibPath(); bool path_checked = false; if (fs::is_symlink(path)) - path_checked = symlinkStartsWith(path, context->getDictionariesLibPath()); + path_checked = symlinkStartsWith(path, dictionaries_lib_path); else - path_checked = pathStartsWith(path, context->getDictionariesLibPath()); + path_checked = pathStartsWith(path, dictionaries_lib_path); if (created_from_ddl && !path_checked) - throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "File path {} is not inside {}", path, context->getDictionariesLibPath()); + throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "File path {} is not inside {}", path, dictionaries_lib_path); if (!fs::exists(path)) throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "LibraryDictionarySource: Can't load library {}: file doesn't exist", path); diff --git a/src/Dictionaries/registerCacheDictionaries.cpp b/src/Dictionaries/registerCacheDictionaries.cpp index 69197f992f0..88e01a31402 100644 --- a/src/Dictionaries/registerCacheDictionaries.cpp +++ b/src/Dictionaries/registerCacheDictionaries.cpp @@ -213,8 +213,9 @@ DictionaryPtr createCacheDictionaryLayout( else { auto storage_configuration = parseSSDCacheStorageConfiguration(config, full_name, layout_type, dictionary_layout_prefix, dict_lifetime); - if (created_from_ddl && !pathStartsWith(storage_configuration.file_path, global_context->getUserFilesPath())) - throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "File path {} is not inside {}", storage_configuration.file_path, global_context->getUserFilesPath()); + auto user_files_path = global_context->getUserFilesPath(); + if (created_from_ddl && !pathStartsWith(storage_configuration.file_path, user_files_path)) + throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "File path {} is not inside {}", storage_configuration.file_path, user_files_path); storage = std::make_shared>(storage_configuration); } From 3f7259f9550e2598143d46dd74645a640a37b5fd Mon Sep 17 00:00:00 2001 From: kssenii Date: Fri, 27 Aug 2021 16:26:26 +0300 Subject: [PATCH 20/23] Check error codes in style-check, update test --- tests/queries/0_stateless/01856_create_function.sql | 10 +++++----- utils/check-style/check-style | 7 +++++++ 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/tests/queries/0_stateless/01856_create_function.sql b/tests/queries/0_stateless/01856_create_function.sql index 7e2f38c2415..3dc6c9f4445 100644 --- a/tests/queries/0_stateless/01856_create_function.sql +++ b/tests/queries/0_stateless/01856_create_function.sql @@ -3,11 +3,11 @@ SELECT 01856_test_function_0(2, 3, 4); SELECT isConstant(01856_test_function_0(1, 2, 3)); DROP FUNCTION 01856_test_function_0; CREATE FUNCTION 01856_test_function_1 AS (a, b) -> a || b || c; --{serverError 47} -CREATE FUNCTION 01856_test_function_1 AS (a, b) -> 01856_test_function_1(a, b) + 01856_test_function_1(a, b); --{serverError 600} -CREATE FUNCTION cast AS a -> a + 1; --{serverError 598} -CREATE FUNCTION sum AS (a, b) -> a + b; --{serverError 598} +CREATE FUNCTION 01856_test_function_1 AS (a, b) -> 01856_test_function_1(a, b) + 01856_test_function_1(a, b); --{serverError 611} +CREATE FUNCTION cast AS a -> a + 1; --{serverError 609} +CREATE FUNCTION sum AS (a, b) -> a + b; --{serverError 609} CREATE FUNCTION 01856_test_function_2 AS (a, b) -> a + b; -CREATE FUNCTION 01856_test_function_2 AS (a) -> a || '!!!'; --{serverError 598} +CREATE FUNCTION 01856_test_function_2 AS (a) -> a || '!!!'; --{serverError 609} DROP FUNCTION 01856_test_function_2; DROP FUNCTION unknown_function; -- {serverError 46} -DROP FUNCTION CAST; -- {serverError 599} +DROP FUNCTION CAST; -- {serverError 610} diff --git a/utils/check-style/check-style b/utils/check-style/check-style index 4363ba0cfae..68479f5f5b0 100755 --- a/utils/check-style/check-style +++ b/utils/check-style/check-style @@ -222,3 +222,10 @@ find $ROOT_PATH/{src,base,programs,utils,tests,docs,website,cmake} -name '*.md' # Forbid subprocess.check_call(...) in integration tests because it does not provide enough information on errors find $ROOT_PATH'/tests/integration' -name '*.py' | xargs grep -F 'subprocess.check_call' | grep -v "STYLE_CHECK_ALLOW_SUBPROCESS_CHECK_CALL" && echo "Use helpers.cluster.run_and_check or subprocess.run instead of subprocess.check_call to print detailed info on error" + +# Forbid non-unique error codes +if [[ "$(grep -Po "M\([0-9]*," $ROOT_PATH/src/Common/ErrorCodes.cpp | wc -l)" != "$(grep -Po "M\([0-9]*," $ROOT_PATH/src/Common/ErrorCodes.cpp | sort | uniq | wc -l)" ]] +then + echo "ErrorCodes.cpp contains non-unique error codes" +fi + From 89e1d1f64a7ba63400867ae2d872183fed58e046 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Fri, 27 Aug 2021 16:55:55 +0300 Subject: [PATCH 21/23] fix order by for StorageMerge with optimize_read_in_order --- src/Storages/StorageMerge.cpp | 5 +++-- .../02014_storage_merge_order_by.reference | 5 +++++ .../02014_storage_merge_order_by.sql | 22 +++++++++++++++++++ 3 files changed, 30 insertions(+), 2 deletions(-) create mode 100644 tests/queries/0_stateless/02014_storage_merge_order_by.reference create mode 100644 tests/queries/0_stateless/02014_storage_merge_order_by.sql diff --git a/src/Storages/StorageMerge.cpp b/src/Storages/StorageMerge.cpp index 4e50b78ea8e..d537ef05cdb 100644 --- a/src/Storages/StorageMerge.cpp +++ b/src/Storages/StorageMerge.cpp @@ -338,9 +338,10 @@ Pipe StorageMerge::read( auto pipe = Pipe::unitePipes(std::move(pipes)); - if (!pipe.empty()) + if (!pipe.empty() && !query_info.input_order_info) // It's possible to have many tables read from merge, resize(num_streams) might open too many files at the same time. - // Using narrowPipe instead. + // Using narrowPipe instead. But in case of reading in order of primary key, we cannot do it, + // because narrowPipe doesn't preserve order. narrowPipe(pipe, num_streams); return pipe; diff --git a/tests/queries/0_stateless/02014_storage_merge_order_by.reference b/tests/queries/0_stateless/02014_storage_merge_order_by.reference new file mode 100644 index 00000000000..0bb816b3987 --- /dev/null +++ b/tests/queries/0_stateless/02014_storage_merge_order_by.reference @@ -0,0 +1,5 @@ +20 +20 +20 +20 +20 diff --git a/tests/queries/0_stateless/02014_storage_merge_order_by.sql b/tests/queries/0_stateless/02014_storage_merge_order_by.sql new file mode 100644 index 00000000000..5b9789ae1d9 --- /dev/null +++ b/tests/queries/0_stateless/02014_storage_merge_order_by.sql @@ -0,0 +1,22 @@ +DROP TABLE IF EXISTS short; +DROP TABLE IF EXISTS long; +DROP TABLE IF EXISTS merged; + +CREATE TABLE short (e Int64, t DateTime ) ENGINE = MergeTree PARTITION BY e ORDER BY t; +CREATE TABLE long (e Int64, t DateTime ) ENGINE = MergeTree PARTITION BY (e, toStartOfMonth(t)) ORDER BY t; + +insert into short select number % 11, toDateTime('2021-01-01 00:00:00') + number from numbers(1000); +insert into long select number % 11, toDateTime('2021-01-01 00:00:00') + number from numbers(1000); + +CREATE TABLE merged as short ENGINE = Merge(currentDatabase(), 'short|long'); + +select sum(e) from (select * from merged order by t limit 10) SETTINGS optimize_read_in_order = 0; + +select sum(e) from (select * from merged order by t limit 10) SETTINGS max_threads = 1; +select sum(e) from (select * from merged order by t limit 10) SETTINGS max_threads = 3; +select sum(e) from (select * from merged order by t limit 10) SETTINGS max_threads = 10; +select sum(e) from (select * from merged order by t limit 10) SETTINGS max_threads = 50; + +DROP TABLE IF EXISTS short; +DROP TABLE IF EXISTS long; +DROP TABLE IF EXISTS merged; From f75d93e97e441bbcbfc485375fc8da1f3ba0317e Mon Sep 17 00:00:00 2001 From: Vitalii S Date: Fri, 27 Aug 2021 13:48:19 -0400 Subject: [PATCH 22/23] Fixed few typos --- .../engines/table-engines/mergetree-family/graphitemergetree.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/ru/engines/table-engines/mergetree-family/graphitemergetree.md b/docs/ru/engines/table-engines/mergetree-family/graphitemergetree.md index fab796b8413..9cd8eda0b87 100644 --- a/docs/ru/engines/table-engines/mergetree-family/graphitemergetree.md +++ b/docs/ru/engines/table-engines/mergetree-family/graphitemergetree.md @@ -171,4 +171,4 @@ default !!! warning "Внимание" - Прореживание данных производится во время слияний. Обычно для старых партций слияния не запускаются, поэтому для прореживания надо иницировать незапланированное слияние используя [optimize](../../../sql-reference/statements/optimize.md). Или использовать дополнительные инструменты, например [graphite-ch-optimizer](https://github.com/innogames/graphite-ch-optimizer). + Прореживание данных производится во время слияний. Обычно для старых партиций слияния не запускаются, поэтому для прореживания надо инициировать незапланированное слияние используя [optimize](../../../sql-reference/statements/optimize.md). Или использовать дополнительные инструменты, например [graphite-ch-optimizer](https://github.com/innogames/graphite-ch-optimizer). From 2990cdfc6da7d43594c2d87aa1ca4e842d2cbacb Mon Sep 17 00:00:00 2001 From: alexey-milovidov Date: Fri, 27 Aug 2021 23:59:56 +0300 Subject: [PATCH 23/23] Update check-style --- utils/check-style/check-style | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/check-style/check-style b/utils/check-style/check-style index 68479f5f5b0..9b9daa9470a 100755 --- a/utils/check-style/check-style +++ b/utils/check-style/check-style @@ -224,7 +224,7 @@ find $ROOT_PATH'/tests/integration' -name '*.py' | xargs grep -F 'subprocess.check_call' | grep -v "STYLE_CHECK_ALLOW_SUBPROCESS_CHECK_CALL" && echo "Use helpers.cluster.run_and_check or subprocess.run instead of subprocess.check_call to print detailed info on error" # Forbid non-unique error codes -if [[ "$(grep -Po "M\([0-9]*," $ROOT_PATH/src/Common/ErrorCodes.cpp | wc -l)" != "$(grep -Po "M\([0-9]*," $ROOT_PATH/src/Common/ErrorCodes.cpp | sort | uniq | wc -l)" ]] +if [[ "$(grep -Po "M\([0-9]*," $ROOT_PATH/src/Common/ErrorCodes.cpp | wc -l)" != "$(grep -Po "M\([0-9]*," $ROOT_PATH/src/Common/ErrorCodes.cpp | sort | uniq | wc -l)" ]] then echo "ErrorCodes.cpp contains non-unique error codes" fi