From edf4e09fb2bc1d8a0a058941f6cb7f13d063c62b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Thu, 29 Aug 2024 18:46:06 +0300 Subject: [PATCH 01/26] Remove stale moving parts without zookeeper --- src/Common/FailPoint.cpp | 1 + .../MergeTree/MergeTreePartsMover.cpp | 16 ++- .../__init__.py | 0 .../test_remove_stale_moving_parts/config.xml | 46 +++++++ .../test_remove_stale_moving_parts/test.py | 126 ++++++++++++++++++ 5 files changed, 188 insertions(+), 1 deletion(-) create mode 100644 tests/integration/test_remove_stale_moving_parts/__init__.py create mode 100644 tests/integration/test_remove_stale_moving_parts/config.xml create mode 100644 tests/integration/test_remove_stale_moving_parts/test.py diff --git a/src/Common/FailPoint.cpp b/src/Common/FailPoint.cpp index b2fcbc77c56..bc4f2cb43d2 100644 --- a/src/Common/FailPoint.cpp +++ b/src/Common/FailPoint.cpp @@ -63,6 +63,7 @@ static struct InitFiu REGULAR(keepermap_fail_drop_data) \ REGULAR(lazy_pipe_fds_fail_close) \ PAUSEABLE(infinite_sleep) \ + PAUSEABLE(stop_moving_part_before_swap_with_active) \ namespace FailPoints diff --git a/src/Storages/MergeTree/MergeTreePartsMover.cpp b/src/Storages/MergeTree/MergeTreePartsMover.cpp index 9223d6fd5b1..8daa48a2339 100644 --- a/src/Storages/MergeTree/MergeTreePartsMover.cpp +++ b/src/Storages/MergeTree/MergeTreePartsMover.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include @@ -15,6 +16,11 @@ namespace ErrorCodes extern const int DIRECTORY_ALREADY_EXISTS; } +namespace FailPoints +{ + extern const char stop_moving_part_before_swap_with_active[]; +} + namespace { @@ -272,7 +278,13 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me cloned_part.part = std::move(builder).withPartFormatFromDisk().build(); LOG_TRACE(log, "Part {} was cloned to {}", part->name, cloned_part.part->getDataPartStorage().getFullPath()); - cloned_part.part->is_temp = data->allowRemoveStaleMovingParts(); + if (data->allowRemoveStaleMovingParts()) + { + cloned_part.part->is_temp = data->allowRemoveStaleMovingParts(); + /// Setting it in case connection to zookeeper is lost while moving + /// Otherwise part might be stuck in the moving directory due to the KEEPER_EXCEPTION in part's destructor + cloned_part.part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::REMOVE_BLOBS; + } cloned_part.part->loadColumnsChecksumsIndexes(true, true); cloned_part.part->loadVersionMetadata(); cloned_part.part->modification_time = cloned_part.part->getDataPartStorage().getLastModified().epochTime(); @@ -282,6 +294,8 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me void MergeTreePartsMover::swapClonedPart(TemporaryClonedPart & cloned_part) const { + /// Used to get some stuck parts in the moving directory by stopping moves while pause is active + FailPointInjection::pauseFailPoint(FailPoints::stop_moving_part_before_swap_with_active); if (moves_blocker.isCancelled()) throw Exception(ErrorCodes::ABORTED, "Cancelled moving parts."); diff --git a/tests/integration/test_remove_stale_moving_parts/__init__.py b/tests/integration/test_remove_stale_moving_parts/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_remove_stale_moving_parts/config.xml b/tests/integration/test_remove_stale_moving_parts/config.xml new file mode 100644 index 00000000000..968e07fae51 --- /dev/null +++ b/tests/integration/test_remove_stale_moving_parts/config.xml @@ -0,0 +1,46 @@ + + + + + + ch1 + 9000 + + + + + + 01 + + + + + s3 + http://minio1:9001/root/data/ + minio + minio123 + + + + + + + default + False + + + s3 + False + + + 0.0 + + + + + + true + s3 + + true + diff --git a/tests/integration/test_remove_stale_moving_parts/test.py b/tests/integration/test_remove_stale_moving_parts/test.py new file mode 100644 index 00000000000..dcbcf38d25a --- /dev/null +++ b/tests/integration/test_remove_stale_moving_parts/test.py @@ -0,0 +1,126 @@ +from pathlib import Path +import time +import pytest +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) +ch1 = cluster.add_instance( + "ch1", + main_configs=[ + "config.xml", + ], + macros={"replica": "node1"}, + with_zookeeper=True, + with_minio=True, +) + +DATABASE_NAME = "stale_moving_parts" + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + yield cluster + + finally: + cluster.shutdown() + + +def q(node, query): + return node.query(database=DATABASE_NAME, sql=query) + + +# .../disks/s3/store/ +def get_table_path(node, table): + return ( + node.query( + sql=f"SELECT data_paths FROM system.tables WHERE table = '{table}' and database = '{DATABASE_NAME}' LIMIT 1" + ) + .strip('"\n[]') + .split(",")[1] + .strip("'") + ) + + +def exec(node, cmd, path): + return node.exec_in_container( + [ + "bash", + "-c", + f"{cmd} {path}", + ] + ) + + +def stop_zookeeper(node): + node.exec_in_container(["bash", "-c", "/opt/zookeeper/bin/zkServer.sh stop"]) + timeout = time.time() + 60 + while node.get_process_pid("zookeeper") != None: + if time.time() > timeout: + raise Exception("Failed to stop ZooKeeper in 60 secs") + time.sleep(0.2) + + +def wait_part_is_stuck(node, table_moving_path, moving_part): + num_tries = 5 + while q(node, "SELECT part_name FROM system.moves").strip() != moving_part: + if num_tries == 0: + raise Exception("Part has not started to move") + num_tries -= 1 + time.sleep(1) + num_tries = 5 + while exec(node, "ls", table_moving_path).strip() != moving_part: + if num_tries == 0: + raise Exception("Part is not stuck in the moving directory") + num_tries -= 1 + time.sleep(1) + + +def wait_zookeeper_node_to_start(zk_nodes, timeout=60): + start = time.time() + while time.time() - start < timeout: + try: + for instance in zk_nodes: + conn = cluster.get_kazoo_client(instance) + conn.get_children("/") + print("All instances of ZooKeeper started") + return + except Exception as ex: + print(("Can't connect to ZooKeeper " + str(ex))) + time.sleep(0.5) + + +def test_remove_stale_moving_parts_without_zookeeper(started_cluster): + ch1.query(f"CREATE DATABASE IF NOT EXISTS {DATABASE_NAME}") + + q( + ch1, + "CREATE TABLE test_remove ON CLUSTER cluster ( id UInt32 ) ENGINE ReplicatedMergeTree() ORDER BY id;", + ) + + table_moving_path = Path(get_table_path(ch1, "test_remove")) / "moving" + + q(ch1, "SYSTEM ENABLE FAILPOINT stop_moving_part_before_swap_with_active") + q(ch1, "INSERT INTO test_remove SELECT number FROM numbers(100);") + moving_part = "all_0_0_0" + move_response = ch1.get_query_request( + sql=f"ALTER TABLE test_remove MOVE PART '{moving_part}' TO DISK 's3'", + database=DATABASE_NAME, + ) + + wait_part_is_stuck(ch1, table_moving_path, moving_part) + + cluster.stop_zookeeper_nodes(["zoo1", "zoo2", "zoo3"]) + # Stop moves in case table is not read-only yet + q(ch1, "SYSTEM STOP MOVES") + q(ch1, "SYSTEM DISABLE FAILPOINT stop_moving_part_before_swap_with_active") + + assert "Cancelled moving parts" in move_response.get_error() + assert exec(ch1, "ls", table_moving_path).strip() == "" + + cluster.start_zookeeper_nodes(["zoo1", "zoo2", "zoo3"]) + wait_zookeeper_node_to_start(["zoo1", "zoo2", "zoo3"]) + q(ch1, "SYSTEM START MOVES") + + q(ch1, f"DROP TABLE test_remove") From b2c4b771d8f1fc83d4096128231130dc93e9fd79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Thu, 29 Aug 2024 19:33:04 +0300 Subject: [PATCH 02/26] Minor fixes --- src/Storages/MergeTree/MergeTreePartsMover.cpp | 3 ++- tests/integration/test_remove_stale_moving_parts/test.py | 9 --------- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreePartsMover.cpp b/src/Storages/MergeTree/MergeTreePartsMover.cpp index 8daa48a2339..11a55fe2420 100644 --- a/src/Storages/MergeTree/MergeTreePartsMover.cpp +++ b/src/Storages/MergeTree/MergeTreePartsMover.cpp @@ -278,9 +278,10 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me cloned_part.part = std::move(builder).withPartFormatFromDisk().build(); LOG_TRACE(log, "Part {} was cloned to {}", part->name, cloned_part.part->getDataPartStorage().getFullPath()); + cloned_part.part->is_temp = false; if (data->allowRemoveStaleMovingParts()) { - cloned_part.part->is_temp = data->allowRemoveStaleMovingParts(); + cloned_part.part->is_temp = true; /// Setting it in case connection to zookeeper is lost while moving /// Otherwise part might be stuck in the moving directory due to the KEEPER_EXCEPTION in part's destructor cloned_part.part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::REMOVE_BLOBS; diff --git a/tests/integration/test_remove_stale_moving_parts/test.py b/tests/integration/test_remove_stale_moving_parts/test.py index dcbcf38d25a..f7cb4e5817e 100644 --- a/tests/integration/test_remove_stale_moving_parts/test.py +++ b/tests/integration/test_remove_stale_moving_parts/test.py @@ -53,15 +53,6 @@ def exec(node, cmd, path): ) -def stop_zookeeper(node): - node.exec_in_container(["bash", "-c", "/opt/zookeeper/bin/zkServer.sh stop"]) - timeout = time.time() + 60 - while node.get_process_pid("zookeeper") != None: - if time.time() > timeout: - raise Exception("Failed to stop ZooKeeper in 60 secs") - time.sleep(0.2) - - def wait_part_is_stuck(node, table_moving_path, moving_part): num_tries = 5 while q(node, "SELECT part_name FROM system.moves").strip() != moving_part: From 32cfdc98b207ca89c1051fc0141d5a2231f9d5e9 Mon Sep 17 00:00:00 2001 From: Michael Stetsyuk Date: Wed, 4 Sep 2024 18:00:03 +0000 Subject: [PATCH 03/26] fix metadata_version in keeper --- .../MergeTree/ReplicatedMergeTreeQueue.cpp | 6 +++ .../MergeTree/ReplicatedMergeTreeQueue.h | 1 + src/Storages/StorageReplicatedMergeTree.cpp | 50 ++++++++++++++++++- src/Storages/StorageReplicatedMergeTree.h | 1 + 4 files changed, 57 insertions(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 627bda3f8bf..9004f986c5e 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -2128,6 +2128,7 @@ ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() const res.inserts_in_queue = 0; res.merges_in_queue = 0; res.part_mutations_in_queue = 0; + res.metadata_alters_in_queue = 0; res.queue_oldest_time = 0; res.inserts_oldest_time = 0; res.merges_oldest_time = 0; @@ -2170,6 +2171,11 @@ ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() const res.oldest_part_to_mutate_to = entry->new_part_name; } } + + if (entry->type == LogEntry::ALTER_METADATA) + { + ++res.metadata_alters_in_queue; + } } return res; diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 89ef6240558..2011d84eefe 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -453,6 +453,7 @@ public: UInt32 inserts_in_queue; UInt32 merges_in_queue; UInt32 part_mutations_in_queue; + UInt32 metadata_alters_in_queue; UInt32 queue_oldest_time; UInt32 inserts_oldest_time; UInt32 merges_oldest_time; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index ff8e362aa36..7167afa7fd3 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -6155,6 +6155,8 @@ void StorageReplicatedMergeTree::alter( mutation_znode.reset(); auto current_metadata = getInMemoryMetadataPtr(); + // update metadata's metadata_version + // fixReplicaMetadataVersionIfNeeded(current_metadata->metadata_version); StorageInMemoryMetadata future_metadata = *current_metadata; commands.apply(future_metadata, query_context); @@ -6200,7 +6202,8 @@ void StorageReplicatedMergeTree::alter( size_t mutation_path_idx = std::numeric_limits::max(); String new_metadata_str = future_metadata_in_zk.toString(); - ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "metadata", new_metadata_str, current_metadata->getMetadataVersion())); + Int32 metadata_version = fixMetadataVersionInZooKeeper(); + ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "metadata", new_metadata_str, metadata_version)); String new_columns_str = future_metadata.columns.toString(); ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "columns", new_columns_str, -1)); @@ -10641,4 +10644,49 @@ template std::optional StorageReplicatedMergeTree::all const std::vector & zookeeper_block_id_path, const String & zookeeper_path_prefix) const; +Int32 StorageReplicatedMergeTree::tryFixMetadataVersionInZooKeeper() +{ + const Int32 metadata_version = getInMemoryMetadataPtr()->getMetadataVersion(); + if (metadata_version != 0) + { + /// No need to fix anything + return metadata_version; + } + + auto zookeeper = getZooKeeper(); + + Coordination::Stat stat; + zookeeper->get(fs::path(zookeeper_path) / "metadata", &stat); + if (stat.version == 0) + { + /// No need to fix anything + return metadata_version; + } + + queue.pullLogsToQueue(zookeeper); + if (queue.getStatus().metadata_alters_in_queue != 0) + { + LOG_DEBUG(log, "No need to update metadata_version as there are ALTER_METADATA entries in the queue"); + return metadata_version; + } + + const Coordination::Requests ops = { + zkutil::makeSetRequest(fs::path(replica_path) / "metadata_version", std::to_string(stat.version), 0), + zkutil::makeCheckRequest(fs::path(zookeeper_path) / "metadata", stat.version), + }; + Coordination::Responses ops_responses; + const auto code = current_zookeeper->tryMulti(ops, ops_responses); + if (code == Coordination::Error::ZOK) + { + LOG_DEBUG(log, "Successfully fixed metadata_version"); + return stat.version; + } + if (code == Coordination::Error::ZBADVERSION) + { + LOG_DEBUG(log, "No need to update metadata_version because table metadata has been updated on a different replica"); + return metadata_version; + } + throw zkutil::KeeperException(code); +} + } diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 2e54f17d5d5..e591a800ea2 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -1013,6 +1013,7 @@ private: DataPartsVector::const_iterator it; }; + Int32 tryFixMetadataVersionInZooKeeper(); }; String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info); From be55e1d2e166c033aaa369970d3c5b21cfda5807 Mon Sep 17 00:00:00 2001 From: Michael Stetsyuk Date: Thu, 5 Sep 2024 18:38:59 +0000 Subject: [PATCH 04/26] better --- .../ReplicatedMergeTreeAttachThread.cpp | 67 ++++++++++++++++++- .../ReplicatedMergeTreeAttachThread.h | 2 + src/Storages/StorageReplicatedMergeTree.cpp | 50 +------------- src/Storages/StorageReplicatedMergeTree.h | 2 - 4 files changed, 67 insertions(+), 54 deletions(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp index 6e22a3515bc..f28b8f9e9a8 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp @@ -15,6 +15,7 @@ namespace ErrorCodes { extern const int SUPPORT_IS_DISABLED; extern const int REPLICA_STATUS_CHANGED; + extern const int LOGICAL_ERROR; } ReplicatedMergeTreeAttachThread::ReplicatedMergeTreeAttachThread(StorageReplicatedMergeTree & storage_) @@ -117,6 +118,66 @@ void ReplicatedMergeTreeAttachThread::checkHasReplicaMetadataInZooKeeper(const z } } +Int32 ReplicatedMergeTreeAttachThread::fixReplicaMetadataVersionIfNeeded(zkutil::ZooKeeperPtr zookeeper) +{ + const String & zookeeper_path = storage.zookeeper_path; + const String & replica_path = storage.replica_path; + + for (size_t i = 0; i != 2; ++i) + { + String replica_metadata_version_str; + const bool replica_metadata_version_exists = zookeeper->tryGet(replica_path + "/metadata_version", replica_metadata_version_str); + if (!replica_metadata_version_exists) + return -1; + + const Int32 metadata_version = parse(replica_metadata_version_str); + + if (metadata_version != 0) + { + /// No need to fix anything + return metadata_version; + } + + Coordination::Stat stat; + zookeeper->get(fs::path(zookeeper_path) / "metadata", &stat); + if (stat.version == 0) + { + /// No need to fix anything + return metadata_version; + } + + ReplicatedMergeTreeQueue & queue = storage.queue; + queue.pullLogsToQueue(zookeeper); + if (queue.getStatus().metadata_alters_in_queue != 0) + { + LOG_DEBUG(log, "No need to update metadata_version as there are ALTER_METADATA entries in the queue"); + return metadata_version; + } + + const Coordination::Requests ops = { + zkutil::makeSetRequest(fs::path(replica_path) / "metadata_version", std::to_string(stat.version), 0), + zkutil::makeCheckRequest(fs::path(zookeeper_path) / "metadata", stat.version), + }; + Coordination::Responses ops_responses; + const auto code = zookeeper->tryMulti(ops, ops_responses); + if (code == Coordination::Error::ZOK) + { + LOG_DEBUG(log, "Successfully set metadata_version to {}", stat.version); + return stat.version; + } + if (code != Coordination::Error::ZBADVERSION) + { + throw zkutil::KeeperException(code); + } + } + + /// Second attempt is only possible if metadata_version != 0 or metadata.version changed during the first attempt. + /// If metadata_version != 0, on second attempt we will return the new metadata_version. + /// If metadata.version changed, on second attempt we will either get metadata_version != 0 and return the new metadata_version or we will get metadata_alters_in_queue != 0 and return 0. + /// Either way, on second attempt this method should return. + throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to fix replica metadata_version in ZooKeeper after two attempts"); +} + void ReplicatedMergeTreeAttachThread::runImpl() { storage.setZooKeeper(); @@ -160,11 +221,11 @@ void ReplicatedMergeTreeAttachThread::runImpl() /// Just in case it was not removed earlier due to connection loss zookeeper->tryRemove(replica_path + "/flags/force_restore_data"); - String replica_metadata_version; - const bool replica_metadata_version_exists = zookeeper->tryGet(replica_path + "/metadata_version", replica_metadata_version); + const Int32 replica_metadata_version = fixReplicaMetadataVersionIfNeeded(zookeeper); + const bool replica_metadata_version_exists = replica_metadata_version != -1; if (replica_metadata_version_exists) { - storage.setInMemoryMetadata(metadata_snapshot->withMetadataVersion(parse(replica_metadata_version))); + storage.setInMemoryMetadata(metadata_snapshot->withMetadataVersion(replica_metadata_version)); } else { diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.h b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.h index 250a5ed34d1..bfc97442598 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.h @@ -48,6 +48,8 @@ private: void runImpl(); void finalizeInitialization(); + + Int32 fixReplicaMetadataVersionIfNeeded(zkutil::ZooKeeperPtr zookeeper); }; } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 5632c24bca4..865a0cbe506 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -6158,8 +6158,6 @@ void StorageReplicatedMergeTree::alter( mutation_znode.reset(); auto current_metadata = getInMemoryMetadataPtr(); - // update metadata's metadata_version - // fixReplicaMetadataVersionIfNeeded(current_metadata->metadata_version); StorageInMemoryMetadata future_metadata = *current_metadata; commands.apply(future_metadata, query_context); @@ -6205,8 +6203,7 @@ void StorageReplicatedMergeTree::alter( size_t mutation_path_idx = std::numeric_limits::max(); String new_metadata_str = future_metadata_in_zk.toString(); - Int32 metadata_version = fixMetadataVersionInZooKeeper(); - ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "metadata", new_metadata_str, metadata_version)); + ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "metadata", new_metadata_str, current_metadata->getMetadataVersion())); String new_columns_str = future_metadata.columns.toString(); ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "columns", new_columns_str, -1)); @@ -10690,49 +10687,4 @@ template std::optional StorageReplicatedMergeTree::all const std::vector & zookeeper_block_id_path, const String & zookeeper_path_prefix) const; -Int32 StorageReplicatedMergeTree::tryFixMetadataVersionInZooKeeper() -{ - const Int32 metadata_version = getInMemoryMetadataPtr()->getMetadataVersion(); - if (metadata_version != 0) - { - /// No need to fix anything - return metadata_version; - } - - auto zookeeper = getZooKeeper(); - - Coordination::Stat stat; - zookeeper->get(fs::path(zookeeper_path) / "metadata", &stat); - if (stat.version == 0) - { - /// No need to fix anything - return metadata_version; - } - - queue.pullLogsToQueue(zookeeper); - if (queue.getStatus().metadata_alters_in_queue != 0) - { - LOG_DEBUG(log, "No need to update metadata_version as there are ALTER_METADATA entries in the queue"); - return metadata_version; - } - - const Coordination::Requests ops = { - zkutil::makeSetRequest(fs::path(replica_path) / "metadata_version", std::to_string(stat.version), 0), - zkutil::makeCheckRequest(fs::path(zookeeper_path) / "metadata", stat.version), - }; - Coordination::Responses ops_responses; - const auto code = current_zookeeper->tryMulti(ops, ops_responses); - if (code == Coordination::Error::ZOK) - { - LOG_DEBUG(log, "Successfully fixed metadata_version"); - return stat.version; - } - if (code == Coordination::Error::ZBADVERSION) - { - LOG_DEBUG(log, "No need to update metadata_version because table metadata has been updated on a different replica"); - return metadata_version; - } - throw zkutil::KeeperException(code); -} - } diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 15cfa77302b..c10f66031ef 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -1026,8 +1026,6 @@ private: const bool & zero_copy_enabled, const bool & always_use_copy_instead_of_hardlinks, const ContextPtr & query_context); - - Int32 tryFixMetadataVersionInZooKeeper(); }; String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info); From cf12e3924f51fd08c884519352a46495412a771a Mon Sep 17 00:00:00 2001 From: flynn Date: Wed, 11 Sep 2024 09:31:46 +0000 Subject: [PATCH 05/26] Speedup insert data with vector similarity index by add data to index parallel --- src/Common/CurrentMetrics.cpp | 3 ++ .../MergeTreeIndexVectorSimilarity.cpp | 42 +++++++++++++++++-- 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 4bf2b0704f1..1d37b4761e1 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -178,6 +178,9 @@ M(ObjectStorageAzureThreads, "Number of threads in the AzureObjectStorage thread pool.") \ M(ObjectStorageAzureThreadsActive, "Number of threads in the AzureObjectStorage thread pool running a task.") \ M(ObjectStorageAzureThreadsScheduled, "Number of queued or active jobs in the AzureObjectStorage thread pool.") \ + M(UsearchUpdateThreads, "Number of threads in the Aggregator thread pool.") \ + M(UsearchUpdateThreadsActive, "Number of threads in the Aggregator thread pool running a task.") \ + M(UsearchUpdateThreadsScheduled, "Number of queued or active jobs in the Aggregator thread pool.") \ \ M(DiskPlainRewritableAzureDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \ M(DiskPlainRewritableLocalDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for LocalObjectStorage.") \ diff --git a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp index 58892d0dbf2..ad166839ce3 100644 --- a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp +++ b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp @@ -4,16 +4,20 @@ #include #include +#include #include +#include #include #include #include +#include #include #include #include #include #include + namespace ProfileEvents { extern const Event USearchAddCount; @@ -24,6 +28,13 @@ namespace ProfileEvents extern const Event USearchSearchComputedDistances; } +namespace CurrentMetrics +{ +extern const Metric UsearchUpdateThreads; +extern const Metric UsearchUpdateThreadsActive; +extern const Metric UsearchUpdateThreadsScheduled; +} + namespace DB { @@ -273,17 +284,42 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c if (!index->try_reserve(roundUpToPowerOfTwoOrZero(index->size() + rows))) throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for vector similarity index"); - for (size_t row = 0; row < rows; ++row) + size_t max_threads = Context::getGlobalContextInstance()->getSettingsRef().max_threads; + max_threads = max_threads > 0 ? max_threads : getNumberOfPhysicalCPUCores(); + + auto thread_pool = std::make_unique( + CurrentMetrics::UsearchUpdateThreads, + CurrentMetrics::UsearchUpdateThreadsActive, + CurrentMetrics::UsearchUpdateThreadsScheduled, + max_threads); + + auto add_vector_to_index = [&](USearchIndex::vector_key_t key, size_t row, ThreadGroupPtr thread_group) { - if (auto result = index->add(static_cast(index->size()), &column_array_data_float_data[column_array_offsets[row - 1]]); !result) - throw Exception(ErrorCodes::INCORRECT_DATA, "Could not add data to vector similarity index. Error: {}", String(result.error.release())); + SCOPE_EXIT_SAFE(if (thread_group) CurrentThread::detachFromGroupIfNotDetached();); + + if (thread_group) + CurrentThread::attachToGroupIfDetached(thread_group); + + if (auto result = index->add(key, &column_array_data_float_data[column_array_offsets[row - 1]]); !result) + throw Exception( + ErrorCodes::INCORRECT_DATA, "Could not add data to vector similarity index. Error: {}", String(result.error.release())); else { ProfileEvents::increment(ProfileEvents::USearchAddCount); ProfileEvents::increment(ProfileEvents::USearchAddVisitedMembers, result.visited_members); ProfileEvents::increment(ProfileEvents::USearchAddComputedDistances, result.computed_distances); } + }; + + size_t current_index_size = index->size(); + + for (size_t row = 0; row < rows; ++row) + { + auto key = static_cast(current_index_size + row); + auto task = [group = CurrentThread::getGroup(), &add_vector_to_index, key, row] { add_vector_to_index(key, row, group); }; + thread_pool->scheduleOrThrowOnError(task); } + thread_pool->wait(); } } From 7425d4aa1ae690810db444796ec4c0a4469d3e76 Mon Sep 17 00:00:00 2001 From: flynn Date: Wed, 11 Sep 2024 10:12:42 +0000 Subject: [PATCH 06/26] remove blank line --- src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp index ad166839ce3..6379c837281 100644 --- a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp +++ b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp @@ -17,7 +17,6 @@ #include #include - namespace ProfileEvents { extern const Event USearchAddCount; From 22c3b71196bc15c3229edae5f6abd7e59950bbe6 Mon Sep 17 00:00:00 2001 From: flynn Date: Thu, 12 Sep 2024 03:54:25 +0000 Subject: [PATCH 07/26] Make vector similarity index creation thread pool globally --- src/Core/ServerSettings.h | 3 ++- src/Interpreters/Context.cpp | 26 ++++++++++++++++++- src/Interpreters/Context.h | 2 ++ .../MergeTreeIndexVectorSimilarity.cpp | 21 +++------------ 4 files changed, 32 insertions(+), 20 deletions(-) diff --git a/src/Core/ServerSettings.h b/src/Core/ServerSettings.h index 79173503f28..2077967e5b6 100644 --- a/src/Core/ServerSettings.h +++ b/src/Core/ServerSettings.h @@ -50,7 +50,7 @@ namespace DB M(UInt32, asynchronous_heavy_metrics_update_period_s, 120, "Period in seconds for updating heavy asynchronous metrics.", 0) \ M(String, default_database, "default", "Default database name.", 0) \ M(String, tmp_policy, "", "Policy for storage with temporary data.", 0) \ - M(UInt64, max_temporary_data_on_disk_size, 0, "The maximum amount of storage that could be used for external aggregation, joins or sorting., ", 0) \ + M(UInt64, max_temporary_data_on_disk_size, 0, "The maximum amount of storage that could be used for external aggregation, joins or sorting.", 0) \ M(String, temporary_data_in_cache, "", "Cache disk name for temporary data.", 0) \ M(UInt64, aggregate_function_group_array_max_element_size, 0xFFFFFF, "Max array element size in bytes for groupArray function. This limit is checked at serialization and help to avoid large state size.", 0) \ M(GroupArrayActionWhenLimitReached, aggregate_function_group_array_action_when_limit_is_reached, GroupArrayActionWhenLimitReached::THROW, "Action to execute when max array element size is exceeded in groupArray: `throw` exception, or `discard` extra values", 0) \ @@ -65,6 +65,7 @@ namespace DB M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \ M(Bool, async_insert_queue_flush_on_shutdown, true, "If true queue of asynchronous inserts is flushed on graceful shutdown", 0) \ M(Bool, ignore_empty_sql_security_in_create_view_query, true, "If true, ClickHouse doesn't write defaults for empty SQL security statement in CREATE VIEW queries. This setting is only necessary for the migration period and will become obsolete in 24.4", 0) \ + M(UInt64, max_thread_pool_size_for_vector_similarity_index_creation, 16, "The maximum number of threads that could be allocated from the OS and used for vector similarity index creation(0 means all cores).", 0) \ \ /* Database Catalog */ \ M(UInt64, database_atomic_delay_before_drop_table_sec, 8 * 60, "The delay during which a dropped table can be restored using the UNDROP statement. If DROP TABLE ran with a SYNC modifier, the setting is ignored.", 0) \ diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 373cc91ebcb..051378ef6d4 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -120,7 +121,6 @@ #include #include - namespace fs = std::filesystem; namespace ProfileEvents @@ -168,6 +168,9 @@ namespace CurrentMetrics extern const Metric AttachedDictionary; extern const Metric AttachedDatabase; extern const Metric PartsActive; + extern const Metric UsearchUpdateThreads; + extern const Metric UsearchUpdateThreadsActive; + extern const Metric UsearchUpdateThreadsScheduled; } @@ -296,6 +299,8 @@ struct ContextSharedPart : boost::noncopyable mutable std::unique_ptr load_marks_threadpool; /// Threadpool for loading marks cache. mutable OnceFlag prefetch_threadpool_initialized; mutable std::unique_ptr prefetch_threadpool; /// Threadpool for loading marks cache. + mutable OnceFlag vector_similarity_index_creation_threadpool_initialized; + mutable std::unique_ptr vector_similarity_index_creation_threadpool; /// Threadpool for vector-similarity index creation. mutable UncompressedCachePtr index_uncompressed_cache TSA_GUARDED_BY(mutex); /// The cache of decompressed blocks for MergeTree indices. mutable QueryCachePtr query_cache TSA_GUARDED_BY(mutex); /// Cache of query results. mutable MarkCachePtr index_mark_cache TSA_GUARDED_BY(mutex); /// Cache of marks in compressed files of MergeTree indices. @@ -3095,6 +3100,25 @@ ThreadPool & Context::getLoadMarksThreadpool() const return *shared->load_marks_threadpool; } +ThreadPool & Context::getVectorSimilarityIndexCreationThreadPool() const +{ + callOnce( + shared->vector_similarity_index_creation_threadpool_initialized, + [&] + { + const auto & server_setting = getServerSettings(); + size_t max_thread_pool_size = server_setting.max_thread_pool_size_for_vector_similarity_index_creation > 0 + ? server_setting.max_thread_pool_size_for_vector_similarity_index_creation + : getNumberOfPhysicalCPUCores(); + shared->vector_similarity_index_creation_threadpool = std::make_unique( + CurrentMetrics::UsearchUpdateThreads, + CurrentMetrics::UsearchUpdateThreadsActive, + CurrentMetrics::UsearchUpdateThreadsScheduled, + max_thread_pool_size); + }); + return *shared->vector_similarity_index_creation_threadpool; +} + void Context::setIndexUncompressedCache(const String & cache_policy, size_t max_size_in_bytes, double size_ratio) { std::lock_guard lock(shared->mutex); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index fb5337158ba..e8f6ce9b3e1 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -1096,6 +1096,8 @@ public: /// and make a prefetch by putting a read task to threadpoolReader. size_t getPrefetchThreadpoolSize() const; + ThreadPool & getVectorSimilarityIndexCreationThreadPool() const; + /// Settings for MergeTree background tasks stored in config.xml BackgroundTaskSchedulingSettings getBackgroundProcessingTaskSchedulingSettings() const; BackgroundTaskSchedulingSettings getBackgroundMoveTaskSchedulingSettings() const; diff --git a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp index 6379c837281..e5ed19585f8 100644 --- a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp +++ b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp @@ -6,7 +6,6 @@ #include #include #include -#include #include #include #include @@ -27,13 +26,6 @@ namespace ProfileEvents extern const Event USearchSearchComputedDistances; } -namespace CurrentMetrics -{ -extern const Metric UsearchUpdateThreads; -extern const Metric UsearchUpdateThreadsActive; -extern const Metric UsearchUpdateThreadsScheduled; -} - namespace DB { @@ -283,14 +275,7 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c if (!index->try_reserve(roundUpToPowerOfTwoOrZero(index->size() + rows))) throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for vector similarity index"); - size_t max_threads = Context::getGlobalContextInstance()->getSettingsRef().max_threads; - max_threads = max_threads > 0 ? max_threads : getNumberOfPhysicalCPUCores(); - - auto thread_pool = std::make_unique( - CurrentMetrics::UsearchUpdateThreads, - CurrentMetrics::UsearchUpdateThreadsActive, - CurrentMetrics::UsearchUpdateThreadsScheduled, - max_threads); + auto & thread_pool = Context::getGlobalContextInstance()->getVectorSimilarityIndexCreationThreadPool(); auto add_vector_to_index = [&](USearchIndex::vector_key_t key, size_t row, ThreadGroupPtr thread_group) { @@ -316,9 +301,9 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c { auto key = static_cast(current_index_size + row); auto task = [group = CurrentThread::getGroup(), &add_vector_to_index, key, row] { add_vector_to_index(key, row, group); }; - thread_pool->scheduleOrThrowOnError(task); + thread_pool.scheduleOrThrowOnError(task); } - thread_pool->wait(); + thread_pool.wait(); } } From fe5e061fffe3ef448dfc8d22b5f1236b09e036ca Mon Sep 17 00:00:00 2001 From: Robert Schulze Date: Thu, 12 Sep 2024 10:16:29 +0000 Subject: [PATCH 08/26] Some fixups --- .../mergetree-family/annindexes.md | 4 ++ .../settings.md | 8 ++++ src/Common/CurrentMetrics.cpp | 6 +-- src/Core/ServerSettings.h | 2 +- src/Interpreters/Context.cpp | 44 +++++++++---------- src/Interpreters/Context.h | 2 +- .../MergeTreeIndexVectorSimilarity.cpp | 29 ++++++------ 7 files changed, 53 insertions(+), 42 deletions(-) diff --git a/docs/en/engines/table-engines/mergetree-family/annindexes.md b/docs/en/engines/table-engines/mergetree-family/annindexes.md index 3c75b8dbef0..b73700c40f4 100644 --- a/docs/en/engines/table-engines/mergetree-family/annindexes.md +++ b/docs/en/engines/table-engines/mergetree-family/annindexes.md @@ -107,6 +107,10 @@ The vector similarity index currently does not work with per-table, non-default [here](https://github.com/ClickHouse/ClickHouse/pull/51325#issuecomment-1605920475)). If necessary, the value must be changed in config.xml. ::: +Vector index creation is known to be slow. To speed the process up, index creation can be parallelized. The maximum number of threads can be +configured using server configuration +setting [max_build_vector_similarity_index_thread_pool_size](server-configuration-parameters/settings.md#server_configuration_parameters_max_build_vector_similarity_index_thread_pool_size). + ANN indexes are built during column insertion and merge. As a result, `INSERT` and `OPTIMIZE` statements will be slower than for ordinary tables. ANNIndexes are ideally used only with immutable or rarely changed data, respectively when are far more read requests than write requests. diff --git a/docs/en/operations/server-configuration-parameters/settings.md b/docs/en/operations/server-configuration-parameters/settings.md index ccc8cf017ca..14a23964100 100644 --- a/docs/en/operations/server-configuration-parameters/settings.md +++ b/docs/en/operations/server-configuration-parameters/settings.md @@ -491,6 +491,14 @@ Type: Double Default: 0.9 +## max_build_vector_similarity_index_thread_pool_size {#server_configuration_parameters_max_build_vector_similarity_index_thread_pool_size} + +The maximum number of threads to use for building vector indexes. 0 means all cores. + +Type: UInt64 + +Default: 16 + ## cgroups_memory_usage_observer_wait_time Interval in seconds during which the server's maximum allowed memory consumption is adjusted by the corresponding threshold in cgroups. (see diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 1d37b4761e1..658eaedbda1 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -178,9 +178,9 @@ M(ObjectStorageAzureThreads, "Number of threads in the AzureObjectStorage thread pool.") \ M(ObjectStorageAzureThreadsActive, "Number of threads in the AzureObjectStorage thread pool running a task.") \ M(ObjectStorageAzureThreadsScheduled, "Number of queued or active jobs in the AzureObjectStorage thread pool.") \ - M(UsearchUpdateThreads, "Number of threads in the Aggregator thread pool.") \ - M(UsearchUpdateThreadsActive, "Number of threads in the Aggregator thread pool running a task.") \ - M(UsearchUpdateThreadsScheduled, "Number of queued or active jobs in the Aggregator thread pool.") \ + M(BuildVectorSimilarityIndexThreads, "Number of threads in the build vector similarity index thread pool.") \ + M(BuildVectorSimilarityIndexThreadsActive, "Number of threads in the build vector similarity index thread pool running a task.") \ + M(BuildVectorSimilarityIndexThreadsScheduled, "Number of queued or active jobs in the build vector similarity index thread pool.") \ \ M(DiskPlainRewritableAzureDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \ M(DiskPlainRewritableLocalDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for LocalObjectStorage.") \ diff --git a/src/Core/ServerSettings.h b/src/Core/ServerSettings.h index b5071e6fa5d..689b18cb74f 100644 --- a/src/Core/ServerSettings.h +++ b/src/Core/ServerSettings.h @@ -65,7 +65,7 @@ namespace DB M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \ M(Bool, async_insert_queue_flush_on_shutdown, true, "If true queue of asynchronous inserts is flushed on graceful shutdown", 0) \ M(Bool, ignore_empty_sql_security_in_create_view_query, true, "If true, ClickHouse doesn't write defaults for empty SQL security statement in CREATE VIEW queries. This setting is only necessary for the migration period and will become obsolete in 24.4", 0) \ - M(UInt64, max_thread_pool_size_for_vector_similarity_index_creation, 16, "The maximum number of threads that could be allocated from the OS and used for vector similarity index creation(0 means all cores).", 0) \ + M(UInt64, max_build_vector_similarity_index_thread_pool_size, 16, "The maximum number of threads to use to build vector similarity indexes. 0 means all cores.", 0) \ \ /* Database Catalog */ \ M(UInt64, database_atomic_delay_before_drop_table_sec, 8 * 60, "The delay during which a dropped table can be restored using the UNDROP statement. If DROP TABLE ran with a SYNC modifier, the setting is ignored.", 0) \ diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 7350fdecd25..311fd094706 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -164,14 +164,14 @@ namespace CurrentMetrics extern const Metric TablesLoaderForegroundThreadsActive; extern const Metric TablesLoaderForegroundThreadsScheduled; extern const Metric IOWriterThreadsScheduled; + extern const Metric BuildVectorSimilarityIndexThreads; + extern const Metric BuildVectorSimilarityIndexThreadsActive; + extern const Metric BuildVectorSimilarityIndexThreadsScheduled; extern const Metric AttachedTable; extern const Metric AttachedView; extern const Metric AttachedDictionary; extern const Metric AttachedDatabase; extern const Metric PartsActive; - extern const Metric UsearchUpdateThreads; - extern const Metric UsearchUpdateThreadsActive; - extern const Metric UsearchUpdateThreadsScheduled; } @@ -300,8 +300,8 @@ struct ContextSharedPart : boost::noncopyable mutable std::unique_ptr load_marks_threadpool; /// Threadpool for loading marks cache. mutable OnceFlag prefetch_threadpool_initialized; mutable std::unique_ptr prefetch_threadpool; /// Threadpool for loading marks cache. - mutable OnceFlag vector_similarity_index_creation_threadpool_initialized; - mutable std::unique_ptr vector_similarity_index_creation_threadpool; /// Threadpool for vector-similarity index creation. + mutable OnceFlag build_vector_similarity_index_threadpool_initialized; + mutable std::unique_ptr build_vector_similarity_index_threadpool; /// Threadpool for vector-similarity index creation. mutable UncompressedCachePtr index_uncompressed_cache TSA_GUARDED_BY(mutex); /// The cache of decompressed blocks for MergeTree indices. mutable QueryCachePtr query_cache TSA_GUARDED_BY(mutex); /// Cache of query results. mutable MarkCachePtr index_mark_cache TSA_GUARDED_BY(mutex); /// Cache of marks in compressed files of MergeTree indices. @@ -3101,25 +3101,6 @@ ThreadPool & Context::getLoadMarksThreadpool() const return *shared->load_marks_threadpool; } -ThreadPool & Context::getVectorSimilarityIndexCreationThreadPool() const -{ - callOnce( - shared->vector_similarity_index_creation_threadpool_initialized, - [&] - { - const auto & server_setting = getServerSettings(); - size_t max_thread_pool_size = server_setting.max_thread_pool_size_for_vector_similarity_index_creation > 0 - ? server_setting.max_thread_pool_size_for_vector_similarity_index_creation - : getNumberOfPhysicalCPUCores(); - shared->vector_similarity_index_creation_threadpool = std::make_unique( - CurrentMetrics::UsearchUpdateThreads, - CurrentMetrics::UsearchUpdateThreadsActive, - CurrentMetrics::UsearchUpdateThreadsScheduled, - max_thread_pool_size); - }); - return *shared->vector_similarity_index_creation_threadpool; -} - void Context::setIndexUncompressedCache(const String & cache_policy, size_t max_size_in_bytes, double size_ratio) { std::lock_guard lock(shared->mutex); @@ -3321,6 +3302,21 @@ size_t Context::getPrefetchThreadpoolSize() const return config.getUInt(".prefetch_threadpool_pool_size", 100); } +ThreadPool & Context::getBuildVectorSimilarityIndexThreadPool() const +{ + callOnce(shared->build_vector_similarity_index_threadpool_initialized, [&] { + size_t pool_size = shared->server_settings.max_build_vector_similarity_index_thread_pool_size > 0 + ? shared->server_settings.max_build_vector_similarity_index_thread_pool_size + : getNumberOfPhysicalCPUCores(); + shared->build_vector_similarity_index_threadpool = std::make_unique( + CurrentMetrics::BuildVectorSimilarityIndexThreads, + CurrentMetrics::BuildVectorSimilarityIndexThreadsActive, + CurrentMetrics::BuildVectorSimilarityIndexThreadsScheduled, + pool_size); + }); + return *shared->build_vector_similarity_index_threadpool; +} + BackgroundSchedulePool & Context::getBufferFlushSchedulePool() const { callOnce(shared->buffer_flush_schedule_pool_initialized, [&] { diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index d309b964dbd..0daef2243aa 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -1097,7 +1097,7 @@ public: /// and make a prefetch by putting a read task to threadpoolReader. size_t getPrefetchThreadpoolSize() const; - ThreadPool & getVectorSimilarityIndexCreationThreadPool() const; + ThreadPool & getBuildVectorSimilarityIndexThreadPool() const; /// Settings for MergeTree background tasks stored in config.xml BackgroundTaskSchedulingSettings getBackgroundProcessingTaskSchedulingSettings() const; diff --git a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp index e5ed19585f8..8a850141a67 100644 --- a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp +++ b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp @@ -4,12 +4,10 @@ #include #include -#include #include #include #include #include -#include #include #include #include @@ -31,7 +29,6 @@ namespace DB namespace ErrorCodes { - extern const int CANNOT_ALLOCATE_MEMORY; extern const int FORMAT_VERSION_TOO_OLD; extern const int ILLEGAL_COLUMN; extern const int INCORRECT_DATA; @@ -133,8 +130,7 @@ void USearchIndexWithSerialization::deserialize(ReadBuffer & istr) /// See the comment in MergeTreeIndexGranuleVectorSimilarity::deserializeBinary why we throw here throw Exception(ErrorCodes::INCORRECT_DATA, "Could not load vector similarity index. Please drop the index and create it again. Error: {}", String(result.error.release())); - if (!try_reserve(limits())) - throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for usearch index"); + try_reserve(limits()); } USearchIndexWithSerialization::Statistics USearchIndexWithSerialization::getStatistics() const @@ -272,21 +268,27 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c throw Exception(ErrorCodes::INCORRECT_DATA, "All arrays in column with vector similarity index must have equal length"); /// Reserving space is mandatory - if (!index->try_reserve(roundUpToPowerOfTwoOrZero(index->size() + rows))) - throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for vector similarity index"); + index->reserve(roundUpToPowerOfTwoOrZero(index->size() + rows)); - auto & thread_pool = Context::getGlobalContextInstance()->getVectorSimilarityIndexCreationThreadPool(); + /// Vector index creation is slooooow. Add the new rows in parallel. The threadpool is global to avoid oversubscription when multiple + /// indexes are build simultaneously (e.g. multiple merges run at the same time). + auto & thread_pool = Context::getGlobalContextInstance()->getBuildVectorSimilarityIndexThreadPool(); auto add_vector_to_index = [&](USearchIndex::vector_key_t key, size_t row, ThreadGroupPtr thread_group) { - SCOPE_EXIT_SAFE(if (thread_group) CurrentThread::detachFromGroupIfNotDetached();); + SCOPE_EXIT_SAFE( + if (thread_group) + CurrentThread::detachFromGroupIfNotDetached(); + ); if (thread_group) CurrentThread::attachToGroupIfDetached(thread_group); + /// add is thread-safe if (auto result = index->add(key, &column_array_data_float_data[column_array_offsets[row - 1]]); !result) - throw Exception( - ErrorCodes::INCORRECT_DATA, "Could not add data to vector similarity index. Error: {}", String(result.error.release())); + { + throw Exception(ErrorCodes::INCORRECT_DATA, "Could not add data to vector similarity index. Error: {}", String(result.error.release())); + } else { ProfileEvents::increment(ProfileEvents::USearchAddCount); @@ -295,14 +297,15 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c } }; - size_t current_index_size = index->size(); + size_t index_size = index->size(); for (size_t row = 0; row < rows; ++row) { - auto key = static_cast(current_index_size + row); + auto key = static_cast(index_size + row); auto task = [group = CurrentThread::getGroup(), &add_vector_to_index, key, row] { add_vector_to_index(key, row, group); }; thread_pool.scheduleOrThrowOnError(task); } + thread_pool.wait(); } From 38b5ea9066a8eae29222e26595957d022d2de26c Mon Sep 17 00:00:00 2001 From: Robert Schulze Date: Thu, 12 Sep 2024 12:43:27 +0000 Subject: [PATCH 09/26] Fix docs --- docs/en/engines/table-engines/mergetree-family/annindexes.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/engines/table-engines/mergetree-family/annindexes.md b/docs/en/engines/table-engines/mergetree-family/annindexes.md index b73700c40f4..f507e2b9f86 100644 --- a/docs/en/engines/table-engines/mergetree-family/annindexes.md +++ b/docs/en/engines/table-engines/mergetree-family/annindexes.md @@ -109,7 +109,7 @@ The vector similarity index currently does not work with per-table, non-default Vector index creation is known to be slow. To speed the process up, index creation can be parallelized. The maximum number of threads can be configured using server configuration -setting [max_build_vector_similarity_index_thread_pool_size](server-configuration-parameters/settings.md#server_configuration_parameters_max_build_vector_similarity_index_thread_pool_size). +setting [max_build_vector_similarity_index_thread_pool_size](../../../operations/server-configuration-parameters/settings.md#server_configuration_parameters_max_build_vector_similarity_index_thread_pool_size). ANN indexes are built during column insertion and merge. As a result, `INSERT` and `OPTIMIZE` statements will be slower than for ordinary tables. ANNIndexes are ideally used only with immutable or rarely changed data, respectively when are far more read requests than write From 042194e3f6e2ad940a3cedc921b771808d89305a Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Fri, 13 Sep 2024 08:50:28 +0000 Subject: [PATCH 10/26] enable removeRecursive in CI --- tests/config/config.d/keeper_port.xml | 1 + tests/docker_scripts/stress_tests.lib | 1 + 2 files changed, 2 insertions(+) diff --git a/tests/config/config.d/keeper_port.xml b/tests/config/config.d/keeper_port.xml index 2b04d843a3b..709d6641806 100644 --- a/tests/config/config.d/keeper_port.xml +++ b/tests/config/config.d/keeper_port.xml @@ -42,6 +42,7 @@ 1 1 1 + 1 diff --git a/tests/docker_scripts/stress_tests.lib b/tests/docker_scripts/stress_tests.lib index cc4c290afef..fb63ef81f80 100644 --- a/tests/docker_scripts/stress_tests.lib +++ b/tests/docker_scripts/stress_tests.lib @@ -64,6 +64,7 @@ function configure() randomize_config_boolean_value multi_read keeper_port randomize_config_boolean_value check_not_exists keeper_port randomize_config_boolean_value create_if_not_exists keeper_port + randomize_config_boolean_value remove_recursive keeper_port fi sudo chown clickhouse /etc/clickhouse-server/config.d/keeper_port.xml From 08fd6c8ab6df952775a978a3aebcfcd3bf4ecbf3 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Tue, 10 Sep 2024 10:30:34 +0800 Subject: [PATCH 11/26] have_compressed is lost in reuseJoinedData --- src/Interpreters/HashJoin/HashJoin.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Interpreters/HashJoin/HashJoin.cpp b/src/Interpreters/HashJoin/HashJoin.cpp index 1b8b45b94ea..230e4cd9691 100644 --- a/src/Interpreters/HashJoin/HashJoin.cpp +++ b/src/Interpreters/HashJoin/HashJoin.cpp @@ -1236,6 +1236,7 @@ IBlocksStreamPtr HashJoin::getNonJoinedBlocks(const Block & left_sample_block, void HashJoin::reuseJoinedData(const HashJoin & join) { + have_compressed = join.have_compressed; data = join.data; from_storage_join = true; From 9ca149a487beef10de0864f1381927ebf4514b76 Mon Sep 17 00:00:00 2001 From: Robert Schulze Date: Fri, 13 Sep 2024 11:07:09 +0000 Subject: [PATCH 12/26] Fix GWP-asan crash --- src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp index 8a850141a67..641770b16e9 100644 --- a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp +++ b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -268,7 +269,9 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c throw Exception(ErrorCodes::INCORRECT_DATA, "All arrays in column with vector similarity index must have equal length"); /// Reserving space is mandatory - index->reserve(roundUpToPowerOfTwoOrZero(index->size() + rows)); + size_t max_thread_pool_size = Context::getGlobalContextInstance()->getServerSettings().max_build_vector_similarity_index_thread_pool_size; + unum::usearch::index_limits_t limits(roundUpToPowerOfTwoOrZero(index->size() + rows), max_thread_pool_size); + index->reserve(limits); /// Vector index creation is slooooow. Add the new rows in parallel. The threadpool is global to avoid oversubscription when multiple /// indexes are build simultaneously (e.g. multiple merges run at the same time). From baf6aaef1dc31bc17209cd1be68a860d7c01cfec Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Fri, 13 Sep 2024 11:32:33 +0000 Subject: [PATCH 13/26] fix tests --- .../0_stateless/02735_system_zookeeper_connection.reference | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/queries/0_stateless/02735_system_zookeeper_connection.reference b/tests/queries/0_stateless/02735_system_zookeeper_connection.reference index a2e987666e4..4eb6092137b 100644 --- a/tests/queries/0_stateless/02735_system_zookeeper_connection.reference +++ b/tests/queries/0_stateless/02735_system_zookeeper_connection.reference @@ -1,2 +1,2 @@ -default 127.0.0.1 9181 0 0 0 1 1 ['FILTERED_LIST','MULTI_READ','CHECK_NOT_EXISTS','CREATE_IF_NOT_EXISTS'] +default 127.0.0.1 9181 0 0 0 1 1 ['FILTERED_LIST','MULTI_READ','CHECK_NOT_EXISTS','CREATE_IF_NOT_EXISTS','REMOVE_RECURSIVE'] zookeeper2 localhost 9181 0 0 0 1 From 51f32450300d79736bf86513b2118e9f1398c0cc Mon Sep 17 00:00:00 2001 From: Robert Schulze Date: Fri, 13 Sep 2024 11:43:59 +0000 Subject: [PATCH 14/26] A better fix for #67476 Throw an exception instead of silently ignoring two conflicting settings. --- src/Common/ErrorCodes.cpp | 1 + src/Interpreters/executeQuery.cpp | 29 ++++++++++-------- .../02494_query_cache_bugs.reference | 20 ------------- .../0_stateless/02494_query_cache_bugs.sql | 30 +++++++------------ 4 files changed, 27 insertions(+), 53 deletions(-) diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 1055b3d34db..09a5375191b 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -609,6 +609,7 @@ M(728, UNEXPECTED_DATA_TYPE) \ M(729, ILLEGAL_TIME_SERIES_TAGS) \ M(730, REFRESH_FAILED) \ + M(731, QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE) \ \ M(900, DISTRIBUTED_CACHE_ERROR) \ M(901, CANNOT_USE_DISTRIBUTED_CACHE) \ diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index be9423852c1..3260ea890c6 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -99,6 +99,7 @@ namespace DB namespace ErrorCodes { extern const int QUERY_CACHE_USED_WITH_NONDETERMINISTIC_FUNCTIONS; + extern const int QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE; extern const int QUERY_CACHE_USED_WITH_SYSTEM_TABLE; extern const int INTO_OUTFILE_NOT_ALLOWED; extern const int INVALID_TRANSACTION; @@ -1118,22 +1119,24 @@ static std::tuple executeQueryImpl( && settings.use_query_cache && !internal && client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY - /// Bug 67476: Avoid that the query cache stores truncated results if the query ran with a non-THROW overflow mode and hit a limit. - /// This is more workaround than a fix ... unfortunately it is hard to detect from the perspective of the query cache that the - /// query result is truncated. - && (settings.read_overflow_mode == OverflowMode::THROW - && settings.read_overflow_mode_leaf == OverflowMode::THROW - && settings.group_by_overflow_mode == OverflowMode::THROW - && settings.sort_overflow_mode == OverflowMode::THROW - && settings.result_overflow_mode == OverflowMode::THROW - && settings.timeout_overflow_mode == OverflowMode::THROW - && settings.set_overflow_mode == OverflowMode::THROW - && settings.join_overflow_mode == OverflowMode::THROW - && settings.transfer_overflow_mode == OverflowMode::THROW - && settings.distinct_overflow_mode == OverflowMode::THROW) && (ast->as() || ast->as()); QueryCache::Usage query_cache_usage = QueryCache::Usage::None; + /// Bug 67476: If the query runs with a non-THROW overflow mode and hits a limit, the query cache will store a truncated result (if + /// enabled). This is incorrect. Unfortunately it is hard to detect from the perspective of the query cache that the query result + /// is truncated. Therefore throw an exception, to notify the user to disable either the query cache or use another overflow mode. + if (settings.use_query_cache && (settings.read_overflow_mode != OverflowMode::THROW + || settings.read_overflow_mode_leaf != OverflowMode::THROW + || settings.group_by_overflow_mode != OverflowMode::THROW + || settings.sort_overflow_mode != OverflowMode::THROW + || settings.result_overflow_mode != OverflowMode::THROW + || settings.timeout_overflow_mode != OverflowMode::THROW + || settings.set_overflow_mode != OverflowMode::THROW + || settings.join_overflow_mode != OverflowMode::THROW + || settings.transfer_overflow_mode != OverflowMode::THROW + || settings.distinct_overflow_mode != OverflowMode::THROW)) + throw Exception(ErrorCodes::QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE, "use_query_cache and overflow_mode != 'throw' cannot be used together"); + /// If the query runs with "use_query_cache = 1", we first probe if the query cache already contains the query result (if yes: /// return result from cache). If doesn't, we execute the query normally and write the result into the query cache. Both steps use a /// hash of the AST, the current database and the settings as cache key. Unfortunately, the settings are in some places internally diff --git a/tests/queries/0_stateless/02494_query_cache_bugs.reference b/tests/queries/0_stateless/02494_query_cache_bugs.reference index ea9017d5394..d50e9c42204 100644 --- a/tests/queries/0_stateless/02494_query_cache_bugs.reference +++ b/tests/queries/0_stateless/02494_query_cache_bugs.reference @@ -23,23 +23,3 @@ Row 1: x: 1 2 -- Bug 67476: Queries with overflow mode != throw must not be cached by the query cache -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 -0 diff --git a/tests/queries/0_stateless/02494_query_cache_bugs.sql b/tests/queries/0_stateless/02494_query_cache_bugs.sql index 423068aa646..755a5fae924 100644 --- a/tests/queries/0_stateless/02494_query_cache_bugs.sql +++ b/tests/queries/0_stateless/02494_query_cache_bugs.sql @@ -43,25 +43,15 @@ DROP TABLE IF EXISTS tab; CREATE TABLE tab(c UInt64) ENGINE = Memory; SYSTEM DROP QUERY CACHE; -SELECT sum(c) FROM tab SETTINGS read_overflow_mode = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; -SELECT sum(c) FROM tab SETTINGS read_overflow_mode_leaf = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; -SELECT sum(c) FROM tab SETTINGS group_by_overflow_mode = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; -SELECT sum(c) FROM tab SETTINGS sort_overflow_mode = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; -SELECT sum(c) FROM tab SETTINGS result_overflow_mode = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; -SELECT sum(c) FROM tab SETTINGS timeout_overflow_mode = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; -SELECT sum(c) FROM tab SETTINGS set_overflow_mode = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; -SELECT sum(c) FROM tab SETTINGS join_overflow_mode = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; -SELECT sum(c) FROM tab SETTINGS transfer_overflow_mode = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; -SELECT sum(c) FROM tab SETTINGS distinct_overflow_mode = 'break', use_query_cache = 1; -SELECT count(*) from system.query_cache; +SELECT sum(c) FROM tab SETTINGS read_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } +SELECT sum(c) FROM tab SETTINGS read_overflow_mode_leaf = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } +SELECT sum(c) FROM tab SETTINGS group_by_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } +SELECT sum(c) FROM tab SETTINGS sort_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } +SELECT sum(c) FROM tab SETTINGS result_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } +SELECT sum(c) FROM tab SETTINGS timeout_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } +SELECT sum(c) FROM tab SETTINGS set_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } +SELECT sum(c) FROM tab SETTINGS join_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } +SELECT sum(c) FROM tab SETTINGS transfer_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } +SELECT sum(c) FROM tab SETTINGS distinct_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE } SYSTEM DROP QUERY CACHE; From 6a7cfd13f7c8840617abefc40f1139b234f8ff59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Fri, 13 Sep 2024 15:25:17 +0300 Subject: [PATCH 15/26] Set PRESERVE_BLOBS if part is fetched from another replica --- src/Storages/MergeTree/MergeTreePartsMover.cpp | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeTreePartsMover.cpp b/src/Storages/MergeTree/MergeTreePartsMover.cpp index 11a55fe2420..d81300da738 100644 --- a/src/Storages/MergeTree/MergeTreePartsMover.cpp +++ b/src/Storages/MergeTree/MergeTreePartsMover.cpp @@ -232,6 +232,7 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me cloned_part.temporary_directory_lock = data->getTemporaryPartDirectoryHolder(part->name); MutableDataPartStoragePtr cloned_part_storage; + bool preserve_blobs = false; if (disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication) { /// Try zero-copy replication and fallback to default copy if it's not possible @@ -259,6 +260,7 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me if (zero_copy_part) { /// FIXME for some reason we cannot just use this part, we have to re-create it through MergeTreeDataPartBuilder + preserve_blobs = true; zero_copy_part->is_temp = false; /// Do not remove it in dtor cloned_part_storage = zero_copy_part->getDataPartStoragePtr(); } @@ -284,7 +286,10 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me cloned_part.part->is_temp = true; /// Setting it in case connection to zookeeper is lost while moving /// Otherwise part might be stuck in the moving directory due to the KEEPER_EXCEPTION in part's destructor - cloned_part.part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::REMOVE_BLOBS; + if (preserve_blobs) + cloned_part.part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::PRESERVE_BLOBS; + else + cloned_part.part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::REMOVE_BLOBS; } cloned_part.part->loadColumnsChecksumsIndexes(true, true); cloned_part.part->loadVersionMetadata(); From 1b1db0081f66068d1bccf3d7963cb872369468f6 Mon Sep 17 00:00:00 2001 From: Michael Stetsyuk Date: Tue, 10 Sep 2024 22:39:35 +0000 Subject: [PATCH 16/26] do not fix metadata_version if replica is read_only --- src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp index f28b8f9e9a8..67570d78366 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp @@ -122,6 +122,7 @@ Int32 ReplicatedMergeTreeAttachThread::fixReplicaMetadataVersionIfNeeded(zkutil: { const String & zookeeper_path = storage.zookeeper_path; const String & replica_path = storage.replica_path; + const bool replica_readonly = storage.is_readonly; for (size_t i = 0; i != 2; ++i) { @@ -132,7 +133,7 @@ Int32 ReplicatedMergeTreeAttachThread::fixReplicaMetadataVersionIfNeeded(zkutil: const Int32 metadata_version = parse(replica_metadata_version_str); - if (metadata_version != 0) + if (metadata_version != 0 || replica_readonly) { /// No need to fix anything return metadata_version; From 721e9a735672fad9f70150434c8ace7a9358fae3 Mon Sep 17 00:00:00 2001 From: Michael Stetsyuk Date: Fri, 13 Sep 2024 12:38:58 +0000 Subject: [PATCH 17/26] empty From b420bbf855f9126db774035b1672565484d947a2 Mon Sep 17 00:00:00 2001 From: "Mikhail f. Shiryaev" Date: Fri, 13 Sep 2024 17:17:10 +0200 Subject: [PATCH 18/26] Improve debug action --- .github/actions/debug/action.yml | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/.github/actions/debug/action.yml b/.github/actions/debug/action.yml index e1fe3f28024..d116c422e03 100644 --- a/.github/actions/debug/action.yml +++ b/.github/actions/debug/action.yml @@ -8,7 +8,7 @@ runs: shell: bash run: | echo "::group::Envs" - env + env | sort echo "::endgroup::" - name: Print Event.json shell: bash @@ -16,3 +16,23 @@ runs: echo "::group::Event.json" python3 -m json.tool "$GITHUB_EVENT_PATH" echo "::endgroup::" + - name: Pring contexts + shell: bash + run: | + cat << 'EOF' + ::group::github + ${{ toJSON(github) }} + ::endgroup:: + + ::group::runner + ${{ toJSON(runner) }} + ::endgroup:: + + ::group::env + ${{ toJSON(env) }} + ::endgroup:: + + ::group::job + ${{ toJSON(job) }} + ::endgroup:: + EOF From 418ef3f8bc9aebbc3f7e644b19b3b6c44f33e8da Mon Sep 17 00:00:00 2001 From: "Mikhail f. Shiryaev" Date: Fri, 13 Sep 2024 17:20:49 +0200 Subject: [PATCH 19/26] Use local debug action in every action --- .github/workflows/backport_branches.yml | 2 ++ .github/workflows/cherry_pick.yml | 2 ++ .github/workflows/create_release.yml | 4 ++-- .github/workflows/docker_test_images.yml | 1 + .github/workflows/jepsen.yml | 7 ++++--- .github/workflows/master.yml | 4 ++-- .github/workflows/merge_queue.yml | 4 ++-- .github/workflows/nightly.yml | 4 ++-- .github/workflows/pull_request.yml | 4 ++-- .github/workflows/release_branches.yml | 2 ++ .github/workflows/reusable_simple_job.yml | 4 ++-- 11 files changed, 23 insertions(+), 15 deletions(-) diff --git a/.github/workflows/backport_branches.yml b/.github/workflows/backport_branches.yml index 23744dc7f8f..794aca4a515 100644 --- a/.github/workflows/backport_branches.yml +++ b/.github/workflows/backport_branches.yml @@ -27,6 +27,8 @@ jobs: clear-repository: true # to ensure correct digests fetch-depth: 0 # to get version filter: tree:0 + - name: Debug Info + uses: ./.github/actions/debug - name: Labels check run: | cd "$GITHUB_WORKSPACE/tests/ci" diff --git a/.github/workflows/cherry_pick.yml b/.github/workflows/cherry_pick.yml index 8d1e2055978..315673d4abc 100644 --- a/.github/workflows/cherry_pick.yml +++ b/.github/workflows/cherry_pick.yml @@ -33,6 +33,8 @@ jobs: clear-repository: true token: ${{secrets.ROBOT_CLICKHOUSE_COMMIT_TOKEN}} fetch-depth: 0 + - name: Debug Info + uses: ./.github/actions/debug - name: Cherry pick run: | cd "$GITHUB_WORKSPACE/tests/ci" diff --git a/.github/workflows/create_release.yml b/.github/workflows/create_release.yml index 1fb6cb60e96..b6c460ab37c 100644 --- a/.github/workflows/create_release.yml +++ b/.github/workflows/create_release.yml @@ -56,13 +56,13 @@ jobs: GH_TOKEN: ${{ secrets.ROBOT_CLICKHOUSE_COMMIT_TOKEN }} runs-on: [self-hosted, release-maker] steps: - - name: DebugInfo - uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6 - name: Check out repository code uses: ClickHouse/checkout@v1 with: token: ${{secrets.ROBOT_CLICKHOUSE_COMMIT_TOKEN}} fetch-depth: 0 + - name: Debug Info + uses: ./.github/actions/debug - name: Prepare Release Info shell: bash run: | diff --git a/.github/workflows/docker_test_images.yml b/.github/workflows/docker_test_images.yml index 3fe1a8883c6..2138420f378 100644 --- a/.github/workflows/docker_test_images.yml +++ b/.github/workflows/docker_test_images.yml @@ -11,6 +11,7 @@ name: Build docker images required: false type: boolean default: false + jobs: DockerBuildAarch64: runs-on: [self-hosted, style-checker-aarch64] diff --git a/.github/workflows/jepsen.yml b/.github/workflows/jepsen.yml index ecafde9e4cb..92e4ce10ade 100644 --- a/.github/workflows/jepsen.yml +++ b/.github/workflows/jepsen.yml @@ -8,27 +8,28 @@ on: # yamllint disable-line rule:truthy schedule: - cron: '0 */6 * * *' workflow_dispatch: + jobs: RunConfig: runs-on: [self-hosted, style-checker-aarch64] outputs: data: ${{ steps.runconfig.outputs.CI_DATA }} steps: - - name: DebugInfo - uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6 - name: Check out repository code uses: ClickHouse/checkout@v1 with: clear-repository: true # to ensure correct digests fetch-depth: 0 # to get version filter: tree:0 + - name: Debug Info + uses: ./.github/actions/debug - name: PrepareRunConfig id: runconfig run: | echo "::group::configure CI run" python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --configure --workflow "$GITHUB_WORKFLOW" --outfile ${{ runner.temp }}/ci_run_data.json echo "::endgroup::" - + echo "::group::CI run configure results" python3 -m json.tool ${{ runner.temp }}/ci_run_data.json echo "::endgroup::" diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml index 2ce1124404f..b76bbbbbdbe 100644 --- a/.github/workflows/master.yml +++ b/.github/workflows/master.yml @@ -15,14 +15,14 @@ jobs: outputs: data: ${{ steps.runconfig.outputs.CI_DATA }} steps: - - name: DebugInfo - uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6 - name: Check out repository code uses: ClickHouse/checkout@v1 with: clear-repository: true # to ensure correct digests fetch-depth: 0 # to get version filter: tree:0 + - name: Debug Info + uses: ./.github/actions/debug - name: Merge sync PR run: | cd "$GITHUB_WORKSPACE/tests/ci" diff --git a/.github/workflows/merge_queue.yml b/.github/workflows/merge_queue.yml index 629cf79770e..45ce81c2caf 100644 --- a/.github/workflows/merge_queue.yml +++ b/.github/workflows/merge_queue.yml @@ -14,14 +14,14 @@ jobs: outputs: data: ${{ steps.runconfig.outputs.CI_DATA }} steps: - - name: DebugInfo - uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6 - name: Check out repository code uses: ClickHouse/checkout@v1 with: clear-repository: true # to ensure correct digests fetch-depth: 0 # to get a version filter: tree:0 + - name: Debug Info + uses: ./.github/actions/debug - name: Cancel PR workflow run: | python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --cancel-previous-run diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml index 36fea39686f..1cea94e7500 100644 --- a/.github/workflows/nightly.yml +++ b/.github/workflows/nightly.yml @@ -15,14 +15,14 @@ jobs: outputs: data: ${{ steps.runconfig.outputs.CI_DATA }} steps: - - name: DebugInfo - uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6 - name: Check out repository code uses: ClickHouse/checkout@v1 with: clear-repository: true # to ensure correct digests fetch-depth: 0 # to get version filter: tree:0 + - name: Debug Info + uses: ./.github/actions/debug - name: PrepareRunConfig id: runconfig run: | diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index dbc740ebc1b..acd392978b6 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -25,14 +25,14 @@ jobs: outputs: data: ${{ steps.runconfig.outputs.CI_DATA }} steps: - - name: DebugInfo - uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6 - name: Check out repository code uses: ClickHouse/checkout@v1 with: clear-repository: true # to ensure correct digests fetch-depth: 0 # to get a version filter: tree:0 + - name: Debug Info + uses: ./.github/actions/debug - name: Cancel previous Sync PR workflow run: | python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --cancel-previous-run diff --git a/.github/workflows/release_branches.yml b/.github/workflows/release_branches.yml index ec119b6ff95..b884ebfe7a0 100644 --- a/.github/workflows/release_branches.yml +++ b/.github/workflows/release_branches.yml @@ -24,6 +24,8 @@ jobs: clear-repository: true # to ensure correct digests fetch-depth: 0 # to get version filter: tree:0 + - name: Debug Info + uses: ./.github/actions/debug - name: Labels check run: | cd "$GITHUB_WORKSPACE/tests/ci" diff --git a/.github/workflows/reusable_simple_job.yml b/.github/workflows/reusable_simple_job.yml index 4d48662ae4e..7df98d96f79 100644 --- a/.github/workflows/reusable_simple_job.yml +++ b/.github/workflows/reusable_simple_job.yml @@ -62,8 +62,6 @@ jobs: env: GITHUB_JOB_OVERRIDDEN: ${{inputs.test_name}} steps: - - name: DebugInfo - uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6 - name: Check out repository code uses: ClickHouse/checkout@v1 with: @@ -72,6 +70,8 @@ jobs: submodules: ${{inputs.submodules}} fetch-depth: ${{inputs.checkout_depth}} filter: tree:0 + - name: Debug Info + uses: ./.github/actions/debug - name: Set build envs run: | cat >> "$GITHUB_ENV" << 'EOF' From b55d0b54ea2810355723fb2edb0705fc07f4c320 Mon Sep 17 00:00:00 2001 From: "Mikhail f. Shiryaev" Date: Fri, 13 Sep 2024 17:27:31 +0200 Subject: [PATCH 20/26] Merge steps together to minimize grouping --- .github/actions/debug/action.yml | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/.github/actions/debug/action.yml b/.github/actions/debug/action.yml index d116c422e03..b45465809d2 100644 --- a/.github/actions/debug/action.yml +++ b/.github/actions/debug/action.yml @@ -4,35 +4,31 @@ description: Prints workflow debug info runs: using: "composite" steps: - - name: Print envs + - name: Envs, event.json and contexts shell: bash run: | - echo "::group::Envs" + echo '::group::Environment variables' env | sort - echo "::endgroup::" - - name: Print Event.json - shell: bash - run: | - echo "::group::Event.json" + echo '::endgroup::' + + echo '::group::event.json' python3 -m json.tool "$GITHUB_EVENT_PATH" - echo "::endgroup::" - - name: Pring contexts - shell: bash - run: | + echo '::endgroup::' + cat << 'EOF' - ::group::github + ::group::github context ${{ toJSON(github) }} ::endgroup:: - ::group::runner - ${{ toJSON(runner) }} - ::endgroup:: - - ::group::env + ::group::env context ${{ toJSON(env) }} ::endgroup:: - ::group::job + ::group::runner context + ${{ toJSON(runner) }} + ::endgroup:: + + ::group::job context ${{ toJSON(job) }} ::endgroup:: EOF From 42670a46d49719efc5caae1ca23b6d95360fd02d Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Fri, 13 Sep 2024 17:00:46 +0100 Subject: [PATCH 21/26] impl --- .../QueryPlan/ReadFromMergeTree.cpp | 27 ------- .../__init__.py | 0 .../configs/remote_servers.xml | 33 +++++++++ .../test.py | 74 +++++++++++++++++++ 4 files changed, 107 insertions(+), 27 deletions(-) create mode 100644 tests/integration/test_parallel_replicas_snapshot_from_initiator/__init__.py create mode 100644 tests/integration/test_parallel_replicas_snapshot_from_initiator/configs/remote_servers.xml create mode 100644 tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 218f0a61a48..0b96cc57274 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -2009,33 +2009,6 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons { auto result = getAnalysisResult(); - if (is_parallel_reading_from_replicas && context->canUseParallelReplicasOnInitiator() - && context->getSettingsRef().parallel_replicas_local_plan) - { - CoordinationMode mode = CoordinationMode::Default; - switch (result.read_type) - { - case ReadFromMergeTree::ReadType::Default: - mode = CoordinationMode::Default; - break; - case ReadFromMergeTree::ReadType::InOrder: - mode = CoordinationMode::WithOrder; - break; - case ReadFromMergeTree::ReadType::InReverseOrder: - mode = CoordinationMode::ReverseOrder; - break; - case ReadFromMergeTree::ReadType::ParallelReplicas: - throw Exception(ErrorCodes::LOGICAL_ERROR, "Read type can't be ParallelReplicas on initiator"); - } - - chassert(number_of_current_replica.has_value()); - chassert(all_ranges_callback.has_value()); - - /// initialize working set from local replica - all_ranges_callback.value()( - InitialAllRangesAnnouncement(mode, result.parts_with_ranges.getDescriptions(), number_of_current_replica.value())); - } - if (enable_remove_parts_from_snapshot_optimization) { /// Do not keep data parts in snapshot. diff --git a/tests/integration/test_parallel_replicas_snapshot_from_initiator/__init__.py b/tests/integration/test_parallel_replicas_snapshot_from_initiator/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_parallel_replicas_snapshot_from_initiator/configs/remote_servers.xml b/tests/integration/test_parallel_replicas_snapshot_from_initiator/configs/remote_servers.xml new file mode 100644 index 00000000000..734acf5f363 --- /dev/null +++ b/tests/integration/test_parallel_replicas_snapshot_from_initiator/configs/remote_servers.xml @@ -0,0 +1,33 @@ + + + + + false + + node0 + 9000 + + + node1 + 9000 + + + node2 + 9000 + + + node3 + 9000 + + + node4 + 9000 + + + node5 + 9000 + + + + + diff --git a/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py b/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py new file mode 100644 index 00000000000..2b3b94f95dc --- /dev/null +++ b/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py @@ -0,0 +1,74 @@ +import pytest + +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) + +nodes = [ + cluster.add_instance( + f"node{num}", main_configs=["configs/remote_servers.xml"], with_zookeeper=True + ) + for num in range(6) +] + + +@pytest.fixture(scope="module", autouse=True) +def start_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def _create_tables(table_name): + for idx, node in enumerate(nodes): + node.query( + f"DROP TABLE IF EXISTS {table_name}", + settings={"database_atomic_wait_for_drop_and_detach_synchronously": True}, + ) + + node.query( + f""" + CREATE TABLE {table_name} (value Int64) + Engine=ReplicatedMergeTree('/test_parallel_replicas/shard/{table_name}', '{idx}') + ORDER BY () + """ + ) + + nodes[0].query( + f"INSERT INTO {table_name} SELECT * FROM numbers(1000)", + settings={"insert_deduplicate": 0}, + ) + nodes[0].query(f"SYSTEM SYNC REPLICA ON CLUSTER 'parallel_replicas' {table_name}") + + for idx, node in enumerate(nodes): + node.query("SYSTEM STOP REPLICATED SENDS") + # the same data on all nodes except for a single value + node.query( + f"INSERT INTO {table_name} VALUES ({idx})", + settings={"insert_deduplicate": 0}, + ) + + +def test_number_of_marks_read(start_cluster): + if nodes[0].is_built_with_sanitizer(): + pytest.skip("Disabled for sanitizers (too slow)") + + table_name = "t" + _create_tables(table_name) + + for idx, node in enumerate(nodes): + expected = 499500 + idx # sum of all integers 0..999 + idx + assert ( + node.query( + "SELECT sum(value) FROM t", + settings={ + "allow_experimental_parallel_reading_from_replicas": 2, + "max_parallel_replicas": 100, + "cluster_for_parallel_replicas": "parallel_replicas", + "parallel_replicas_local_plan": True, + }, + ) + == f"{expected}\n" + ) From de78992966ac73825dd597e0f0d168b83f71360d Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Fri, 13 Sep 2024 18:25:51 +0200 Subject: [PATCH 22/26] Update tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py Co-authored-by: Igor Nikonov <954088+devcrafter@users.noreply.github.com> --- .../test_parallel_replicas_snapshot_from_initiator/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py b/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py index 2b3b94f95dc..5da0a57ee7e 100644 --- a/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py +++ b/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py @@ -62,7 +62,7 @@ def test_number_of_marks_read(start_cluster): expected = 499500 + idx # sum of all integers 0..999 + idx assert ( node.query( - "SELECT sum(value) FROM t", + "SELECT sum(value) FROM {table_name}", settings={ "allow_experimental_parallel_reading_from_replicas": 2, "max_parallel_replicas": 100, From 57a6a64d8c622fc42b2e3cb16a0f9f23783bab46 Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Fri, 13 Sep 2024 17:31:28 +0100 Subject: [PATCH 23/26] fix --- .../test_parallel_replicas_snapshot_from_initiator/test.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py b/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py index 5da0a57ee7e..ec962c7cb32 100644 --- a/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py +++ b/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py @@ -51,10 +51,9 @@ def _create_tables(table_name): ) -def test_number_of_marks_read(start_cluster): - if nodes[0].is_built_with_sanitizer(): - pytest.skip("Disabled for sanitizers (too slow)") - +# check that we use the state of data parts from the initiator node (for some sort of determinism of what is been read). +# currently it is implemented only when we build local plan for the initiator node (we aim to make this behavior default) +def test_initiator_snapshot_is_used_for_reading(start_cluster): table_name = "t" _create_tables(table_name) From 8fd9345d2d7fae6711b4733f658d330df2edffc2 Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Fri, 13 Sep 2024 21:26:58 +0100 Subject: [PATCH 24/26] fix --- .../test_parallel_replicas_snapshot_from_initiator/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py b/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py index ec962c7cb32..a7e7e99455b 100644 --- a/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py +++ b/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py @@ -61,7 +61,7 @@ def test_initiator_snapshot_is_used_for_reading(start_cluster): expected = 499500 + idx # sum of all integers 0..999 + idx assert ( node.query( - "SELECT sum(value) FROM {table_name}", + f"SELECT sum(value) FROM {table_name}", settings={ "allow_experimental_parallel_reading_from_replicas": 2, "max_parallel_replicas": 100, From a461d20af92278fb973ba7a86ec84fb58946ce74 Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Fri, 13 Sep 2024 23:03:01 +0200 Subject: [PATCH 25/26] Masking sensitive info in gcs() table function. --- src/Parsers/FunctionSecretArgumentsFinderAST.h | 3 ++- tests/integration/test_mask_sensitive_info/test.py | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Parsers/FunctionSecretArgumentsFinderAST.h b/src/Parsers/FunctionSecretArgumentsFinderAST.h index 94da30922cc..15d9a8d5add 100644 --- a/src/Parsers/FunctionSecretArgumentsFinderAST.h +++ b/src/Parsers/FunctionSecretArgumentsFinderAST.h @@ -74,7 +74,8 @@ private: findMySQLFunctionSecretArguments(); } else if ((function.name == "s3") || (function.name == "cosn") || (function.name == "oss") || - (function.name == "deltaLake") || (function.name == "hudi") || (function.name == "iceberg")) + (function.name == "deltaLake") || (function.name == "hudi") || (function.name == "iceberg") || + (function.name == "gcs")) { /// s3('url', 'aws_access_key_id', 'aws_secret_access_key', ...) findS3FunctionSecretArguments(/* is_cluster_function= */ false); diff --git a/tests/integration/test_mask_sensitive_info/test.py b/tests/integration/test_mask_sensitive_info/test.py index 8d5345082ff..5366de39ea7 100644 --- a/tests/integration/test_mask_sensitive_info/test.py +++ b/tests/integration/test_mask_sensitive_info/test.py @@ -393,6 +393,7 @@ def test_table_functions(): f"azureBlobStorageCluster('test_shard_localhost', named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_16.csv', format = 'CSV')", f"azureBlobStorageCluster('test_shard_localhost', named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_17.csv', account_name = '{azure_account_name}', account_key = '{azure_account_key}')", f"iceberg('http://minio1:9001/root/data/test11.csv.gz', 'minio', '{password}')", + f"gcs('http://minio1:9001/root/data/test11.csv.gz', 'minio', '{password}')", ] def make_test_case(i): From 37411bf240e7c94a0e9e07c645a7ec74d6758aab Mon Sep 17 00:00:00 2001 From: Robert Schulze Date: Sun, 15 Sep 2024 15:06:14 +0000 Subject: [PATCH 26/26] Fix sizing with unconstrained thread pool size --- src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp index 641770b16e9..bf9aad6545d 100644 --- a/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp +++ b/src/Storages/MergeTree/MergeTreeIndexVectorSimilarity.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -270,6 +271,8 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c /// Reserving space is mandatory size_t max_thread_pool_size = Context::getGlobalContextInstance()->getServerSettings().max_build_vector_similarity_index_thread_pool_size; + if (max_thread_pool_size == 0) + max_thread_pool_size = getNumberOfPhysicalCPUCores(); unum::usearch::index_limits_t limits(roundUpToPowerOfTwoOrZero(index->size() + rows), max_thread_pool_size); index->reserve(limits);