From c6b558c7915b070167649d4e88eafb2613570bd3 Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Thu, 4 Jul 2024 22:30:18 +0200 Subject: [PATCH 01/13] Done --- .../02814_currentDatabase_for_table_functions.sql | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/queries/0_stateless/02814_currentDatabase_for_table_functions.sql b/tests/queries/0_stateless/02814_currentDatabase_for_table_functions.sql index 74b5cf5f432..8b1e3ba1e10 100644 --- a/tests/queries/0_stateless/02814_currentDatabase_for_table_functions.sql +++ b/tests/queries/0_stateless/02814_currentDatabase_for_table_functions.sql @@ -13,7 +13,13 @@ CREATE MATERIALIZED VIEW null_mv Engine = Log AS SELECT * FROM null_table LEFT J CREATE TABLE null_table_buffer (number UInt64) ENGINE = Buffer(currentDatabase(), null_table, 1, 1, 1, 100, 200, 10000, 20000); INSERT INTO null_table_buffer VALUES (1); -SELECT sleep(3) FORMAT Null; + +-- OPTIMIZE query should flush Buffer table, but still it is not guaranteed +-- (see the comment StorageBuffer::optimize) +-- But the combination of OPTIMIZE + sleep + OPTIMIZE should be enough. +OPTIMIZE TABLE null_table_buffer; +SELECT sleep(1) FORMAT Null; +OPTIMIZE TABLE null_table_buffer; -- Insert about should've landed into `null_mv` SELECT count() FROM null_mv; From ca7e003c6d7af6bf0676bba7cb61ab560c202bf3 Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Fri, 12 Jul 2024 15:27:03 +0000 Subject: [PATCH 02/13] Fixed test --- .../02814_currentDatabase_for_table_functions.reference | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/queries/0_stateless/02814_currentDatabase_for_table_functions.reference b/tests/queries/0_stateless/02814_currentDatabase_for_table_functions.reference index 7ff95106d3d..20b14d9a67b 100644 --- a/tests/queries/0_stateless/02814_currentDatabase_for_table_functions.reference +++ b/tests/queries/0_stateless/02814_currentDatabase_for_table_functions.reference @@ -11,7 +11,12 @@ CREATE VIEW number_view as SELECT * FROM numbers(10) as tb; CREATE MATERIALIZED VIEW null_mv Engine = Log AS SELECT * FROM null_table LEFT JOIN number_view as tb USING number; CREATE TABLE null_table_buffer (number UInt64) ENGINE = Buffer(currentDatabase(), null_table, 1, 1, 1, 100, 200, 10000, 20000); INSERT INTO null_table_buffer VALUES (1); -SELECT sleep(3) FORMAT Null; +-- OPTIMIZE query should flush Buffer table, but still it is not guaranteed +-- (see the comment StorageBuffer::optimize) +-- But the combination of OPTIMIZE + sleep + OPTIMIZE should be enough. +OPTIMIZE TABLE null_table_buffer; +SELECT sleep(1) FORMAT Null; +OPTIMIZE TABLE null_table_buffer; -- Insert about should've landed into `null_mv` SELECT count() FROM null_mv; 1 From 8d7471f8bd2e0c6dc242231c4358448787e6c56f Mon Sep 17 00:00:00 2001 From: Alexander Gololobov Date: Sat, 20 Jul 2024 00:03:40 +0200 Subject: [PATCH 03/13] Fix for deadlock in getDDLWorker --- src/Interpreters/Context.cpp | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 94bcb88ed53..48878733a00 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -3490,18 +3490,22 @@ DDLWorker & Context::getDDLWorker() const if (shared->ddl_worker_startup_task) waitLoad(shared->ddl_worker_startup_task); // Just wait and do not prioritize, because it depends on all load and startup tasks - SharedLockGuard lock(shared->mutex); - if (!shared->ddl_worker) { - if (!hasZooKeeper()) - throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "There is no Zookeeper configuration in server config"); - - if (!hasDistributedDDL()) - throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "There is no DistributedDDL configuration in server config"); - - throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "DDL background thread is not initialized"); + /// Only acquire the lock for reading ddl_worker field. + /// hasZooKeeper() and hasDistributedDDL() acquire the same lock as well and double acquisition of the lock in shared mode can lead + /// to a deadlock if an exclusive lock attempt is made in the meantime by another thread. + SharedLockGuard lock(shared->mutex); + if (shared->ddl_worker) + return *shared->ddl_worker; } - return *shared->ddl_worker; + + if (!hasZooKeeper()) + throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "There is no Zookeeper configuration in server config"); + + if (!hasDistributedDDL()) + throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "There is no DistributedDDL configuration in server config"); + + throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "DDL background thread is not initialized"); } zkutil::ZooKeeperPtr Context::getZooKeeper() const From 0f8feff4d3806fa6f81d24184ab68bcd6e727551 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Tue, 30 Jul 2024 12:34:30 +0200 Subject: [PATCH 04/13] Add KeeperMap retries --- src/Storages/StorageKeeperMap.cpp | 132 +++++++++++++++++------------- 1 file changed, 73 insertions(+), 59 deletions(-) diff --git a/src/Storages/StorageKeeperMap.cpp b/src/Storages/StorageKeeperMap.cpp index ef157239e26..5534bb7f346 100644 --- a/src/Storages/StorageKeeperMap.cpp +++ b/src/Storages/StorageKeeperMap.cpp @@ -35,6 +35,7 @@ #include #include +#include "Common/ZooKeeper/ZooKeeperRetries.h" #include #include #include @@ -120,7 +121,7 @@ public: : SinkToStorage(header), storage(storage_), context(std::move(context_)) { auto primary_key = storage.getPrimaryKey(); - assert(primary_key.size() == 1); + chassert(primary_key.size() == 1); primary_key_pos = getHeader().getPositionByName(primary_key[0]); } @@ -171,76 +172,89 @@ public: template void finalize(bool strict) { - auto zookeeper = storage.getClient(); + const auto & settings = context->getSettingsRef(); - auto keys_limit = storage.keysLimit(); + ZooKeeperRetriesControl zk_retry{ + getName(), + getLogger(getName()), + ZooKeeperRetriesInfo{ + settings.insert_keeper_max_retries, + settings.insert_keeper_retry_initial_backoff_ms, + settings.insert_keeper_retry_max_backoff_ms}, + context->getProcessListElement()}; - size_t current_keys_num = 0; - size_t new_keys_num = 0; - - // We use keys limit as a soft limit so we ignore some cases when it can be still exceeded - // (e.g if parallel insert queries are being run) - if (keys_limit != 0) + retries_ctl.retryLoop([&]() { - Coordination::Stat data_stat; - zookeeper->get(storage.dataPath(), &data_stat); - current_keys_num = data_stat.numChildren; - } + auto zookeeper = storage.getClient(); + auto keys_limit = storage.keysLimit(); - std::vector key_paths; - key_paths.reserve(new_values.size()); - for (const auto & [key, _] : new_values) - key_paths.push_back(storage.fullPathForKey(key)); + size_t current_keys_num = 0; + size_t new_keys_num = 0; - zkutil::ZooKeeper::MultiExistsResponse results; - - if constexpr (!for_update) - { - if (!strict) - results = zookeeper->exists(key_paths); - } - - Coordination::Requests requests; - requests.reserve(key_paths.size()); - for (size_t i = 0; i < key_paths.size(); ++i) - { - auto key = fs::path(key_paths[i]).filename(); - - if constexpr (for_update) + // We use keys limit as a soft limit so we ignore some cases when it can be still exceeded + // (e.g if parallel insert queries are being run) + if (keys_limit != 0) { - int32_t version = -1; - if (strict) - version = versions.at(key); - - requests.push_back(zkutil::makeSetRequest(key_paths[i], new_values[key], version)); + Coordination::Stat data_stat; + zookeeper->get(storage.dataPath(), &data_stat); + current_keys_num = data_stat.numChildren; } - else + + std::vector key_paths; + key_paths.reserve(new_values.size()); + for (const auto & [key, _] : new_values) + key_paths.push_back(storage.fullPathForKey(key)); + + zkutil::ZooKeeper::MultiExistsResponse results; + + if constexpr (!for_update) { - if (!strict && results[i].error == Coordination::Error::ZOK) + if (!strict) + results = zookeeper->exists(key_paths); + } + + Coordination::Requests requests; + requests.reserve(key_paths.size()); + for (size_t i = 0; i < key_paths.size(); ++i) + { + auto key = fs::path(key_paths[i]).filename(); + + if constexpr (for_update) { - requests.push_back(zkutil::makeSetRequest(key_paths[i], new_values[key], -1)); + int32_t version = -1; + if (strict) + version = versions.at(key); + + requests.push_back(zkutil::makeSetRequest(key_paths[i], new_values[key], version)); } else { - requests.push_back(zkutil::makeCreateRequest(key_paths[i], new_values[key], zkutil::CreateMode::Persistent)); - ++new_keys_num; + if (!strict && results[i].error == Coordination::Error::ZOK) + { + requests.push_back(zkutil::makeSetRequest(key_paths[i], new_values[key], -1)); + } + else + { + requests.push_back(zkutil::makeCreateRequest(key_paths[i], new_values[key], zkutil::CreateMode::Persistent)); + ++new_keys_num; + } } } - } - if (new_keys_num != 0) - { - auto will_be = current_keys_num + new_keys_num; - if (keys_limit != 0 && will_be > keys_limit) - throw Exception( - ErrorCodes::LIMIT_EXCEEDED, - "Limit would be exceeded by inserting {} new key(s). Limit is {}, while the number of keys would be {}", - new_keys_num, - keys_limit, - will_be); - } + if (new_keys_num != 0) + { + auto will_be = current_keys_num + new_keys_num; + if (keys_limit != 0 && will_be > keys_limit) + throw Exception( + ErrorCodes::LIMIT_EXCEEDED, + "Limit would be exceeded by inserting {} new key(s). Limit is {}, while the number of keys would be {}", + new_keys_num, + keys_limit, + will_be); + } - zookeeper->multi(requests, /* check_session_valid */ true); + zookeeper->multi(requests, /* check_session_valid */ true); + }); } }; @@ -529,8 +543,8 @@ Pipe StorageKeeperMap::read( size_t num_keys = keys->size(); size_t num_threads = std::min(num_streams, keys->size()); - assert(num_keys <= std::numeric_limits::max()); - assert(num_threads <= std::numeric_limits::max()); + chassert(num_keys <= std::numeric_limits::max()); + chassert(num_threads <= std::numeric_limits::max()); for (size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) { @@ -1160,7 +1174,7 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca bool strict = local_context->getSettingsRef().keeper_map_strict_mode; - assert(commands.size() == 1); + chassert(commands.size() == 1); auto metadata_snapshot = getInMemoryMetadataPtr(); auto storage = getStorageID(); @@ -1236,7 +1250,7 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca return; } - assert(commands.front().type == MutationCommand::Type::UPDATE); + chassert(commands.front().type == MutationCommand::Type::UPDATE); if (commands.front().column_to_update_expression.contains(primary_key)) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Primary key cannot be updated (cannot update column {})", primary_key); From eb129b539fce2a407182d892ce3bd00f782a5833 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Tue, 30 Jul 2024 13:46:27 +0200 Subject: [PATCH 05/13] Add tests --- src/Storages/StorageKeeperMap.cpp | 135 +++++++++++++----- src/Storages/StorageKeeperMap.h | 3 +- .../test_keeper_map_retries/__init__.py | 0 .../configs/enable_keeper_map.xml | 3 + .../configs/fault_injection.xml | 6 + .../test_keeper_map_retries/test.py | 78 ++++++++++ .../02911_backup_restore_keeper_map.sh | 15 +- 7 files changed, 194 insertions(+), 46 deletions(-) create mode 100644 tests/integration/test_keeper_map_retries/__init__.py create mode 100644 tests/integration/test_keeper_map_retries/configs/enable_keeper_map.xml create mode 100644 tests/integration/test_keeper_map_retries/configs/fault_injection.xml create mode 100644 tests/integration/test_keeper_map_retries/test.py diff --git a/src/Storages/StorageKeeperMap.cpp b/src/Storages/StorageKeeperMap.cpp index 5534bb7f346..09c21ae28f5 100644 --- a/src/Storages/StorageKeeperMap.cpp +++ b/src/Storages/StorageKeeperMap.cpp @@ -183,7 +183,7 @@ public: settings.insert_keeper_retry_max_backoff_ms}, context->getProcessListElement()}; - retries_ctl.retryLoop([&]() + zk_retry.retryLoop([&]() { auto zookeeper = storage.getClient(); auto keys_limit = storage.keysLimit(); @@ -205,12 +205,12 @@ public: for (const auto & [key, _] : new_values) key_paths.push_back(storage.fullPathForKey(key)); - zkutil::ZooKeeper::MultiExistsResponse results; + zkutil::ZooKeeper::MultiTryGetResponse results; if constexpr (!for_update) { if (!strict) - results = zookeeper->exists(key_paths); + results = zookeeper->tryGet(key_paths); } Coordination::Requests requests; @@ -231,7 +231,8 @@ public: { if (!strict && results[i].error == Coordination::Error::ZOK) { - requests.push_back(zkutil::makeSetRequest(key_paths[i], new_values[key], -1)); + if (results[i].data != new_values[key]) + requests.push_back(zkutil::makeSetRequest(key_paths[i], new_values[key], -1)); } else { @@ -241,6 +242,9 @@ public: } } + if (requests.empty()) + return; + if (new_keys_num != 0) { auto will_be = current_keys_num + new_keys_num; @@ -259,7 +263,7 @@ public: }; template -class StorageKeeperMapSource : public ISource +class StorageKeeperMapSource : public ISource, WithContext { const StorageKeeperMap & storage; size_t max_block_size; @@ -290,8 +294,15 @@ public: KeyContainerPtr container_, KeyContainerIter begin_, KeyContainerIter end_, - bool with_version_column_) - : ISource(getHeader(header, with_version_column_)), storage(storage_), max_block_size(max_block_size_), container(std::move(container_)), it(begin_), end(end_) + bool with_version_column_, + ContextPtr context_) + : ISource(getHeader(header, with_version_column_)) + , WithContext(std::move(context_)) + , storage(storage_) + , max_block_size(max_block_size_) + , container(std::move(container_)) + , it(begin_) + , end(end_) , with_version_column(with_version_column_) { } @@ -316,12 +327,12 @@ public: for (auto & raw_key : raw_keys) raw_key = base64Encode(raw_key, /* url_encoding */ true); - return storage.getBySerializedKeys(raw_keys, nullptr, with_version_column); + return storage.getBySerializedKeys(raw_keys, nullptr, with_version_column, getContext()); } else { size_t elem_num = std::min(max_block_size, static_cast(end - it)); - auto chunk = storage.getBySerializedKeys(std::span{it, it + elem_num}, nullptr, with_version_column); + auto chunk = storage.getBySerializedKeys(std::span{it, it + elem_num}, nullptr, with_version_column, getContext()); it += elem_num; return chunk; } @@ -553,14 +564,31 @@ Pipe StorageKeeperMap::read( using KeyContainer = typename KeyContainerPtr::element_type; pipes.emplace_back(std::make_shared>( - *this, sample_block, max_block_size, keys, keys->begin() + begin, keys->begin() + end, with_version_column)); + *this, sample_block, max_block_size, keys, keys->begin() + begin, keys->begin() + end, with_version_column, context_)); } return Pipe::unitePipes(std::move(pipes)); }; - auto client = getClient(); if (all_scan) - return process_keys(std::make_shared>(client->getChildren(zk_data_path))); + { + const auto & settings = context_->getSettingsRef(); + ZooKeeperRetriesControl zk_retry{ + getName(), + getLogger(getName()), + ZooKeeperRetriesInfo{ + settings.keeper_max_retries, + settings.keeper_retry_initial_backoff_ms, + settings.keeper_retry_max_backoff_ms}, + context_->getProcessListElement()}; + + std::vector children; + zk_retry.retryLoop([&] + { + auto client = getClient(); + children = client->getChildren(zk_data_path); + }); + return process_keys(std::make_shared>(std::move(children))); + } return process_keys(std::move(filtered_keys)); } @@ -571,11 +599,24 @@ SinkToStoragePtr StorageKeeperMap::write(const ASTPtr & /*query*/, const Storage return std::make_shared(*this, metadata_snapshot->getSampleBlock(), local_context); } -void StorageKeeperMap::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &) +void StorageKeeperMap::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &) { checkTable(); - auto client = getClient(); - client->tryRemoveChildrenRecursive(zk_data_path, true); + const auto & settings = local_context->getSettingsRef(); + ZooKeeperRetriesControl zk_retry{ + getName(), + getLogger(getName()), + ZooKeeperRetriesInfo{ + settings.keeper_max_retries, + settings.keeper_retry_initial_backoff_ms, + settings.keeper_retry_max_backoff_ms}, + local_context->getProcessListElement()}; + + zk_retry.retryLoop([&] + { + auto client = getClient(); + client->tryRemoveChildrenRecursive(zk_data_path, true); + }); } bool StorageKeeperMap::dropTable(zkutil::ZooKeeperPtr zookeeper, const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock) @@ -1064,10 +1105,11 @@ Chunk StorageKeeperMap::getByKeys(const ColumnsWithTypeAndName & keys, PaddedPOD if (raw_keys.size() != keys[0].column->size()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Assertion failed: {} != {}", raw_keys.size(), keys[0].column->size()); - return getBySerializedKeys(raw_keys, &null_map, /* version_column */ false); + return getBySerializedKeys(raw_keys, &null_map, /* version_column */ false, getContext()); } -Chunk StorageKeeperMap::getBySerializedKeys(const std::span keys, PaddedPODArray * null_map, bool with_version) const +Chunk StorageKeeperMap::getBySerializedKeys( + const std::span keys, PaddedPODArray * null_map, bool with_version, const ContextPtr & local_context) const { Block sample_block = getInMemoryMetadataPtr()->getSampleBlock(); MutableColumns columns = sample_block.cloneEmptyColumns(); @@ -1084,17 +1126,27 @@ Chunk StorageKeeperMap::getBySerializedKeys(const std::span k null_map->resize_fill(keys.size(), 1); } - auto client = getClient(); - Strings full_key_paths; full_key_paths.reserve(keys.size()); for (const auto & key : keys) - { full_key_paths.emplace_back(fullPathForKey(key)); - } - auto values = client->tryGet(full_key_paths); + const auto & settings = local_context->getSettingsRef(); + ZooKeeperRetriesControl zk_retry{ + getName(), + getLogger(getName()), + ZooKeeperRetriesInfo{ + settings.keeper_max_retries, + settings.keeper_retry_initial_backoff_ms, + settings.keeper_retry_max_backoff_ms}, + local_context->getProcessListElement()}; + + zkutil::ZooKeeper::MultiTryGetResponse values; + zk_retry.retryLoop([&]{ + auto client = getClient(); + values = client->tryGet(full_key_paths); + }); for (size_t i = 0; i < keys.size(); ++i) { @@ -1182,16 +1234,16 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca if (commands.front().type == MutationCommand::Type::DELETE) { - MutationsInterpreter::Settings settings(true); - settings.return_all_columns = true; - settings.return_mutated_rows = true; + MutationsInterpreter::Settings mutation_settings(true); + mutation_settings.return_all_columns = true; + mutation_settings.return_mutated_rows = true; auto interpreter = std::make_unique( storage_ptr, metadata_snapshot, commands, local_context, - settings); + mutation_settings); auto pipeline = QueryPipelineBuilder::getPipeline(interpreter->execute()); PullingPipelineExecutor executor(pipeline); @@ -1200,8 +1252,6 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca auto primary_key_pos = header.getPositionByName(primary_key); auto version_position = header.getPositionByName(std::string{version_column_name}); - auto client = getClient(); - Block block; while (executor.pull(block)) { @@ -1229,7 +1279,23 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca } Coordination::Responses responses; - auto status = client->tryMulti(delete_requests, responses, /* check_session_valid */ true); + + const auto & settings = local_context->getSettingsRef(); + ZooKeeperRetriesControl zk_retry{ + getName(), + getLogger(getName()), + ZooKeeperRetriesInfo{ + settings.keeper_max_retries, + settings.keeper_retry_initial_backoff_ms, + settings.keeper_retry_max_backoff_ms}, + local_context->getProcessListElement()}; + + Coordination::Error status; + zk_retry.retryLoop([&] + { + auto client = getClient(); + status = client->tryMulti(delete_requests, responses, /* check_session_valid */ true); + }); if (status == Coordination::Error::ZOK) return; @@ -1241,9 +1307,14 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca for (const auto & delete_request : delete_requests) { - auto code = client->tryRemove(delete_request->getPath()); - if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE) - throw zkutil::KeeperException::fromPath(code, delete_request->getPath()); + zk_retry.retryLoop([&] + { + auto client = getClient(); + status = client->tryRemove(delete_request->getPath()); + }); + + if (status != Coordination::Error::ZOK && status != Coordination::Error::ZNONODE) + throw zkutil::KeeperException::fromPath(status, delete_request->getPath()); } } diff --git a/src/Storages/StorageKeeperMap.h b/src/Storages/StorageKeeperMap.h index d4556792c48..cfbb35ab2fe 100644 --- a/src/Storages/StorageKeeperMap.h +++ b/src/Storages/StorageKeeperMap.h @@ -54,7 +54,8 @@ public: Names getPrimaryKey() const override { return {primary_key}; } Chunk getByKeys(const ColumnsWithTypeAndName & keys, PaddedPODArray & null_map, const Names &) const override; - Chunk getBySerializedKeys(std::span keys, PaddedPODArray * null_map, bool with_version) const; + Chunk getBySerializedKeys( + std::span keys, PaddedPODArray * null_map, bool with_version, const ContextPtr & local_context) const; Block getSampleBlock(const Names &) const override; diff --git a/tests/integration/test_keeper_map_retries/__init__.py b/tests/integration/test_keeper_map_retries/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_keeper_map_retries/configs/enable_keeper_map.xml b/tests/integration/test_keeper_map_retries/configs/enable_keeper_map.xml new file mode 100644 index 00000000000..b4cbb6a954b --- /dev/null +++ b/tests/integration/test_keeper_map_retries/configs/enable_keeper_map.xml @@ -0,0 +1,3 @@ + + /test_keeper_map + diff --git a/tests/integration/test_keeper_map_retries/configs/fault_injection.xml b/tests/integration/test_keeper_map_retries/configs/fault_injection.xml new file mode 100644 index 00000000000..145945c7c7c --- /dev/null +++ b/tests/integration/test_keeper_map_retries/configs/fault_injection.xml @@ -0,0 +1,6 @@ + + + 0.05 + 0.05 + + diff --git a/tests/integration/test_keeper_map_retries/test.py b/tests/integration/test_keeper_map_retries/test.py new file mode 100644 index 00000000000..352119147cd --- /dev/null +++ b/tests/integration/test_keeper_map_retries/test.py @@ -0,0 +1,78 @@ +import pytest + +from helpers.cluster import ClickHouseCluster + +import os + +CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs") + +cluster = ClickHouseCluster(__file__) + +node = cluster.add_instance( + "node", + main_configs=["configs/enable_keeper_map.xml"], + with_zookeeper=True, + stay_alive=True, +) + + +def start_clean_clickhouse(): + # remove fault injection if present + if "fault_injection.xml" in node.exec_in_container( + ["bash", "-c", "ls /etc/clickhouse-server/config.d"] + ): + print("Removing fault injection") + node.exec_in_container( + ["bash", "-c", "rm /etc/clickhouse-server/config.d/fault_injection.xml"] + ) + node.restart_clickhouse() + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + yield cluster + + finally: + cluster.shutdown() + + +def repeat_query(query, repeat): + for _ in range(repeat): + node.query( + query, + settings={ + "keeper_max_retries": 20, + "keeper_retry_max_backoff_ms": 10000, + }, + ) + + +def test_queries(started_cluster): + start_clean_clickhouse() + + node.query("DROP TABLE IF EXISTS keeper_map_retries SYNC") + node.query( + "CREATE TABLE keeper_map_retries (a UInt64, b UInt64) Engine=KeeperMap('/keeper_map_retries') PRIMARY KEY a" + ) + + node.stop_clickhouse() + node.copy_file_to_container( + os.path.join(CONFIG_DIR, "fault_injection.xml"), + "/etc/clickhouse-server/config.d/fault_injection.xml", + ) + node.start_clickhouse() + + repeat_count = 10 + + repeat_query( + "INSERT INTO keeper_map_retries SELECT number, number FROM numbers(500)", + repeat_count, + ) + repeat_query("SELECT * FROM keeper_map_retries", repeat_count) + repeat_query( + "ALTER TABLE keeper_map_retries UPDATE b = 3 WHERE a > 2", repeat_count + ) + repeat_query("ALTER TABLE keeper_map_retries DELETE WHERE a > 2", repeat_count) + repeat_query("TRUNCATE keeper_map_retries", repeat_count) diff --git a/tests/queries/0_stateless/02911_backup_restore_keeper_map.sh b/tests/queries/0_stateless/02911_backup_restore_keeper_map.sh index ee070b40f6f..c04667505c3 100755 --- a/tests/queries/0_stateless/02911_backup_restore_keeper_map.sh +++ b/tests/queries/0_stateless/02911_backup_restore_keeper_map.sh @@ -13,20 +13,9 @@ $CLICKHOUSE_CLIENT -nm -q " CREATE TABLE $database_name.02911_backup_restore_keeper_map3 (key UInt64, value String) Engine=KeeperMap('/' || currentDatabase() || '/test02911_different') PRIMARY KEY key; " -# KeeperMap table engine doesn't have internal retries for interaction with Keeper. Do it on our own, otherwise tests with overloaded server can be flaky. -while true -do - $CLICKHOUSE_CLIENT -nm -q "INSERT INTO $database_name.02911_backup_restore_keeper_map2 SELECT number, 'test' || toString(number) FROM system.numbers LIMIT 5000; - " 2>&1 | grep -q "KEEPER_EXCEPTION" && sleep 1 && continue - break -done +$CLICKHOUSE_CLIENT -nm -q "INSERT INTO $database_name.02911_backup_restore_keeper_map2 SELECT number, 'test' || toString(number) FROM system.numbers LIMIT 5000;" -while true -do - $CLICKHOUSE_CLIENT -nm -q "INSERT INTO $database_name.02911_backup_restore_keeper_map3 SELECT number, 'test' || toString(number) FROM system.numbers LIMIT 3000; - " 2>&1 | grep -q "KEEPER_EXCEPTION" && sleep 1 && continue - break -done +$CLICKHOUSE_CLIENT -nm -q "INSERT INTO $database_name.02911_backup_restore_keeper_map3 SELECT number, 'test' || toString(number) FROM system.numbers LIMIT 3000;" backup_path="$database_name" for i in $(seq 1 3); do From fd26672864a7e1557908b878d7daa018de20c61a Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Tue, 30 Jul 2024 13:54:30 +0200 Subject: [PATCH 06/13] Revert some change --- src/Storages/StorageKeeperMap.cpp | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/Storages/StorageKeeperMap.cpp b/src/Storages/StorageKeeperMap.cpp index 09c21ae28f5..1559b442e43 100644 --- a/src/Storages/StorageKeeperMap.cpp +++ b/src/Storages/StorageKeeperMap.cpp @@ -35,7 +35,6 @@ #include #include -#include "Common/ZooKeeper/ZooKeeperRetries.h" #include #include #include @@ -44,6 +43,7 @@ #include #include #include +#include #include #include @@ -205,12 +205,12 @@ public: for (const auto & [key, _] : new_values) key_paths.push_back(storage.fullPathForKey(key)); - zkutil::ZooKeeper::MultiTryGetResponse results; + zkutil::ZooKeeper::MultiExistsResponse results; if constexpr (!for_update) { if (!strict) - results = zookeeper->tryGet(key_paths); + results = zookeeper->exists(key_paths); } Coordination::Requests requests; @@ -231,8 +231,7 @@ public: { if (!strict && results[i].error == Coordination::Error::ZOK) { - if (results[i].data != new_values[key]) - requests.push_back(zkutil::makeSetRequest(key_paths[i], new_values[key], -1)); + requests.push_back(zkutil::makeSetRequest(key_paths[i], new_values[key], -1)); } else { @@ -242,9 +241,6 @@ public: } } - if (requests.empty()) - return; - if (new_keys_num != 0) { auto will_be = current_keys_num + new_keys_num; From 7d5c30e76cf0fd17515803fec96899f4aad1294e Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Tue, 30 Jul 2024 16:26:19 +0200 Subject: [PATCH 07/13] No retries when partitioned --- tests/integration/test_keeper_map/test.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_keeper_map/test.py b/tests/integration/test_keeper_map/test.py index 31316af7b1e..7aee5df5746 100644 --- a/tests/integration/test_keeper_map/test.py +++ b/tests/integration/test_keeper_map/test.py @@ -46,7 +46,11 @@ def assert_keeper_exception_after_partition(query): with PartitionManager() as pm: pm.drop_instance_zk_connections(node) try: - error = node.query_and_get_error_with_retry(query, sleep_time=1) + error = node.query_and_get_error_with_retry( + query, + sleep_time=1, + settings={"insert_keeper_max_retries": 1, "keeper_max_retries": 1}, + ) assert "Coordination::Exception" in error except: print_iptables_rules() @@ -84,7 +88,9 @@ def test_keeper_map_without_zk(started_cluster): node.restart_clickhouse(60) try: error = node.query_and_get_error_with_retry( - "SELECT * FROM test_keeper_map_without_zk", sleep_time=1 + "SELECT * FROM test_keeper_map_without_zk", + sleep_time=1, + settings={"keeper_max_retries": 1}, ) assert "Failed to activate table because of connection issues" in error except: From 6b7c5eb5da1be1fc31d4ebfd4f0dfa0c6a6e728c Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Wed, 31 Jul 2024 14:09:07 +0200 Subject: [PATCH 08/13] Fix drop --- src/Storages/StorageKeeperMap.cpp | 34 +++++++++++++++-------- src/Storages/StorageKeeperMap.h | 21 ++++++++++---- tests/integration/test_keeper_map/test.py | 5 ++-- 3 files changed, 41 insertions(+), 19 deletions(-) diff --git a/src/Storages/StorageKeeperMap.cpp b/src/Storages/StorageKeeperMap.cpp index 1559b442e43..0634c7be6ee 100644 --- a/src/Storages/StorageKeeperMap.cpp +++ b/src/Storages/StorageKeeperMap.cpp @@ -79,6 +79,7 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; extern const int LIMIT_EXCEEDED; extern const int CANNOT_RESTORE_TABLE; + extern const int INVALID_STATE; } namespace @@ -497,7 +498,7 @@ StorageKeeperMap::StorageKeeperMap( } - table_is_valid = true; + table_status = TableStatus::VALID; /// we are the first table created for the specified Keeper path, i.e. we are the first replica return; } @@ -656,7 +657,18 @@ bool StorageKeeperMap::dropTable(zkutil::ZooKeeperPtr zookeeper, const zkutil::E void StorageKeeperMap::drop() { - checkTable(); + auto current_table_status = getTableStatus(); + if (current_table_status == TableStatus::UNKNOWN) + { + static constexpr auto error_msg = "Failed to activate table because of connection issues. It will be activated " + "once a connection is established and metadata is verified"; + throw Exception(ErrorCodes::INVALID_STATE, error_msg); + } + + /// if only column metadata is wrong we can still drop the table correctly + if (current_table_status == TableStatus::INVALID_KEEPER_STRUCTURE) + return; + auto client = getClient(); // we allow ZNONODE in case we got hardware error on previous drop @@ -1017,11 +1029,11 @@ UInt64 StorageKeeperMap::keysLimit() const return keys_limit; } -std::optional StorageKeeperMap::isTableValid() const +StorageKeeperMap::TableStatus StorageKeeperMap::getTableStatus() const { std::lock_guard lock{init_mutex}; - if (table_is_valid.has_value()) - return table_is_valid; + if (table_status != TableStatus::UNKNOWN) + return table_status; [&] { @@ -1034,7 +1046,7 @@ std::optional StorageKeeperMap::isTableValid() const if (metadata_stat.numChildren == 0) { - table_is_valid = false; + table_status = TableStatus::INVALID_KEEPER_STRUCTURE; return; } @@ -1045,7 +1057,7 @@ std::optional StorageKeeperMap::isTableValid() const "Table definition does not match to the one stored in the path {}. Stored definition: {}", zk_root_path, stored_metadata_string); - table_is_valid = false; + table_status = TableStatus::INVALID_METADATA; return; } @@ -1058,7 +1070,7 @@ std::optional StorageKeeperMap::isTableValid() const Coordination::Responses responses; client->tryMulti(requests, responses); - table_is_valid = false; + table_status = TableStatus::INVALID_KEEPER_STRUCTURE; if (responses[0]->error != Coordination::Error::ZOK) { LOG_ERROR(log, "Table node ({}) is missing", zk_table_path); @@ -1077,18 +1089,18 @@ std::optional StorageKeeperMap::isTableValid() const return; } - table_is_valid = true; + table_status = TableStatus::VALID; } catch (const Coordination::Exception & e) { tryLogCurrentException(log); if (!Coordination::isHardwareError(e.code)) - table_is_valid = false; + table_status = TableStatus::INVALID_KEEPER_STRUCTURE; } }(); - return table_is_valid; + return table_status; } Chunk StorageKeeperMap::getByKeys(const ColumnsWithTypeAndName & keys, PaddedPODArray & null_map, const Names &) const diff --git a/src/Storages/StorageKeeperMap.h b/src/Storages/StorageKeeperMap.h index cfbb35ab2fe..8ed348a4f6f 100644 --- a/src/Storages/StorageKeeperMap.h +++ b/src/Storages/StorageKeeperMap.h @@ -80,8 +80,8 @@ public: template void checkTable() const { - auto is_table_valid = isTableValid(); - if (!is_table_valid.has_value()) + auto current_table_status = getTableStatus(); + if (table_status == TableStatus::UNKNOWN) { static constexpr auto error_msg = "Failed to activate table because of connection issues. It will be activated " "once a connection is established and metadata is verified"; @@ -94,10 +94,10 @@ public: } } - if (!*is_table_valid) + if (current_table_status != TableStatus::VALID) { static constexpr auto error_msg - = "Failed to activate table because of invalid metadata in ZooKeeper. Please DETACH table"; + = "Failed to activate table because of invalid metadata in ZooKeeper. Please DROP/DETACH table"; if constexpr (throw_on_error) throw Exception(ErrorCodes::INVALID_STATE, error_msg); else @@ -111,7 +111,15 @@ public: private: bool dropTable(zkutil::ZooKeeperPtr zookeeper, const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock); - std::optional isTableValid() const; + enum class TableStatus : uint8_t + { + UNKNOWN, + INVALID_METADATA, + INVALID_KEEPER_STRUCTURE, + VALID + }; + + TableStatus getTableStatus() const; void restoreDataImpl( const BackupPtr & backup, @@ -143,7 +151,8 @@ private: mutable zkutil::ZooKeeperPtr zookeeper_client{nullptr}; mutable std::mutex init_mutex; - mutable std::optional table_is_valid; + + mutable TableStatus table_status{TableStatus::UNKNOWN}; LoggerPtr log; }; diff --git a/tests/integration/test_keeper_map/test.py b/tests/integration/test_keeper_map/test.py index 7aee5df5746..4b1bcd11cfe 100644 --- a/tests/integration/test_keeper_map/test.py +++ b/tests/integration/test_keeper_map/test.py @@ -67,6 +67,7 @@ def run_query(query): def test_keeper_map_without_zk(started_cluster): + run_query("DROP TABLE IF EXISTS test_keeper_map_without_zk SYNC") assert_keeper_exception_after_partition( "CREATE TABLE test_keeper_map_without_zk (key UInt64, value UInt64) ENGINE = KeeperMap('/test_keeper_map_without_zk') PRIMARY KEY(key);" ) @@ -107,12 +108,12 @@ def test_keeper_map_without_zk(started_cluster): ) assert "Failed to activate table because of invalid metadata in ZooKeeper" in error - node.query("DETACH TABLE test_keeper_map_without_zk") - client.stop() def test_keeper_map_with_failed_drop(started_cluster): + run_query("DROP TABLE IF EXISTS test_keeper_map_with_failed_drop SYNC") + run_query("DROP TABLE IF EXISTS test_keeper_map_with_failed_drop_another SYNC") run_query( "CREATE TABLE test_keeper_map_with_failed_drop (key UInt64, value UInt64) ENGINE = KeeperMap('/test_keeper_map_with_failed_drop') PRIMARY KEY(key);" ) From 6e914ff6da67be1c1381ffed2d04b5758704baf3 Mon Sep 17 00:00:00 2001 From: Thom O'Connor Date: Wed, 31 Jul 2024 21:59:37 +0000 Subject: [PATCH 09/13] Update settings.md Removing duplicate header "## background_merges_mutations_scheduling_policy" --- docs/en/operations/server-configuration-parameters/settings.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/docs/en/operations/server-configuration-parameters/settings.md b/docs/en/operations/server-configuration-parameters/settings.md index 8278f8c8699..a1e3c292b04 100644 --- a/docs/en/operations/server-configuration-parameters/settings.md +++ b/docs/en/operations/server-configuration-parameters/settings.md @@ -103,8 +103,6 @@ Default: 2 The policy on how to perform a scheduling for background merges and mutations. Possible values are: `round_robin` and `shortest_task_first`. -## background_merges_mutations_scheduling_policy - Algorithm used to select next merge or mutation to be executed by background thread pool. Policy may be changed at runtime without server restart. Could be applied from the `default` profile for backward compatibility. From c2df527a32d640f52296ea7aefae177e22504082 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Thu, 1 Aug 2024 08:42:54 +0200 Subject: [PATCH 10/13] Reduce fault rate --- .../test_keeper_map_retries/configs/fault_injection.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_keeper_map_retries/configs/fault_injection.xml b/tests/integration/test_keeper_map_retries/configs/fault_injection.xml index 145945c7c7c..0933b6b3031 100644 --- a/tests/integration/test_keeper_map_retries/configs/fault_injection.xml +++ b/tests/integration/test_keeper_map_retries/configs/fault_injection.xml @@ -1,6 +1,6 @@ - 0.05 - 0.05 + 0.005 + 0.005 From 7db4065898633ace1f909711d4caeda8d135cace Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Thu, 1 Aug 2024 10:30:50 +0200 Subject: [PATCH 11/13] Add retries to create --- src/Storages/StorageKeeperMap.cpp | 373 +++++++++++------- src/Storages/StorageKeeperMap.h | 6 +- .../configs/keeper_retries.xml | 14 + tests/integration/test_keeper_map/test.py | 3 +- .../configs/fault_injection.xml | 1 + .../configs/keeper_retries.xml | 14 + .../test_keeper_map_retries/test.py | 13 +- 7 files changed, 275 insertions(+), 149 deletions(-) create mode 100644 tests/integration/test_keeper_map/configs/keeper_retries.xml create mode 100644 tests/integration/test_keeper_map_retries/configs/keeper_retries.xml diff --git a/src/Storages/StorageKeeperMap.cpp b/src/Storages/StorageKeeperMap.cpp index 0634c7be6ee..a6be9f8da04 100644 --- a/src/Storages/StorageKeeperMap.cpp +++ b/src/Storages/StorageKeeperMap.cpp @@ -408,104 +408,192 @@ StorageKeeperMap::StorageKeeperMap( if (attach) { - checkTable(); + checkTable(context_); return; } - auto client = getClient(); + const auto & settings = context_->getSettingsRef(); + ZooKeeperRetriesControl zk_retry{ + getName(), + getLogger(getName()), + ZooKeeperRetriesInfo{settings.keeper_max_retries, settings.keeper_retry_initial_backoff_ms, settings.keeper_retry_max_backoff_ms}, + context_->getProcessListElement()}; - if (zk_root_path != "/" && !client->exists(zk_root_path)) - { - LOG_TRACE(log, "Creating root path {}", zk_root_path); - client->createAncestors(zk_root_path); - client->createIfNotExists(zk_root_path, ""); - } + zk_retry.retryLoop( + [&] + { + auto client = getClient(); + if (zk_root_path != "/" && !client->exists(zk_root_path)) + { + LOG_TRACE(log, "Creating root path {}", zk_root_path); + client->createAncestors(zk_root_path); + client->createIfNotExists(zk_root_path, ""); + } + }); + + std::shared_ptr metadata_drop_lock; + int32_t drop_lock_version = -1; for (size_t i = 0; i < 1000; ++i) { - std::string stored_metadata_string; - auto exists = client->tryGet(zk_metadata_path, stored_metadata_string); - - if (exists) - { - // this requires same name for columns - // maybe we can do a smarter comparison for columns and primary key expression - if (stored_metadata_string != metadata_string) - throw Exception( - ErrorCodes::BAD_ARGUMENTS, - "Path {} is already used but the stored table definition doesn't match. Stored metadata: {}", - zk_root_path, - stored_metadata_string); - - auto code = client->tryCreate(zk_table_path, "", zkutil::CreateMode::Persistent); - - /// A table on the same Keeper path already exists, we just appended our table id to subscribe as a new replica - /// We still don't know if the table matches the expected metadata so table_is_valid is not changed - /// It will be checked lazily on the first operation - if (code == Coordination::Error::ZOK) - return; - - if (code != Coordination::Error::ZNONODE) - throw zkutil::KeeperException(code, "Failed to create table on path {} because a table with same UUID already exists", zk_root_path); - - /// ZNONODE means we dropped zk_tables_path but didn't finish drop completely - } - - if (client->exists(zk_dropped_path)) - { - LOG_INFO(log, "Removing leftover nodes"); - auto code = client->tryCreate(zk_dropped_lock_path, "", zkutil::CreateMode::Ephemeral); - - if (code == Coordination::Error::ZNONODE) + bool success = false; + zk_retry.retryLoop( + [&] { - LOG_INFO(log, "Someone else removed leftover nodes"); - } - else if (code == Coordination::Error::ZNODEEXISTS) - { - LOG_INFO(log, "Someone else is removing leftover nodes"); - continue; - } - else if (code != Coordination::Error::ZOK) - { - throw Coordination::Exception::fromPath(code, zk_dropped_lock_path); - } - else - { - auto metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(zk_dropped_lock_path, *client); - if (!dropTable(client, metadata_drop_lock)) - continue; - } - } + auto client = getClient(); + std::string stored_metadata_string; + auto exists = client->tryGet(zk_metadata_path, stored_metadata_string); - Coordination::Requests create_requests - { - zkutil::makeCreateRequest(zk_metadata_path, metadata_string, zkutil::CreateMode::Persistent), - zkutil::makeCreateRequest(zk_data_path, metadata_string, zkutil::CreateMode::Persistent), - zkutil::makeCreateRequest(zk_tables_path, "", zkutil::CreateMode::Persistent), - zkutil::makeCreateRequest(zk_table_path, "", zkutil::CreateMode::Persistent), - }; + if (exists) + { + // this requires same name for columns + // maybe we can do a smarter comparison for columns and primary key expression + if (stored_metadata_string != metadata_string) + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Path {} is already used but the stored table definition doesn't match. Stored metadata: {}", + zk_root_path, + stored_metadata_string); - Coordination::Responses create_responses; - auto code = client->tryMulti(create_requests, create_responses); - if (code == Coordination::Error::ZNODEEXISTS) - { - LOG_INFO(log, "It looks like a table on path {} was created by another server at the same moment, will retry", zk_root_path); - continue; - } - else if (code != Coordination::Error::ZOK) - { - zkutil::KeeperMultiException::check(code, create_requests, create_responses); - } + auto code = client->tryCreate(zk_table_path, "", zkutil::CreateMode::Persistent); + /// A table on the same Keeper path already exists, we just appended our table id to subscribe as a new replica + /// We still don't know if the table matches the expected metadata so table_is_valid is not changed + /// It will be checked lazily on the first operation + if (code == Coordination::Error::ZOK) + { + success = true; + return; + } - table_status = TableStatus::VALID; - /// we are the first table created for the specified Keeper path, i.e. we are the first replica - return; + /// We most likely created the path but got a timeout or disconnect + if (code == Coordination::Error::ZNODEEXISTS && zk_retry.isRetry()) + { + success = true; + return; + } + + if (code != Coordination::Error::ZNONODE) + throw zkutil::KeeperException( + code, "Failed to create table on path {} because a table with same UUID already exists", zk_root_path); + + /// ZNONODE means we dropped zk_tables_path but didn't finish drop completely + } + + if (client->exists(zk_dropped_path)) + { + LOG_INFO(log, "Removing leftover nodes"); + + bool drop_finished = false; + if (zk_retry.isRetry() && metadata_drop_lock != nullptr && drop_lock_version != -1) + { + /// if we have leftover lock from previous try, we need to recreate the ephemeral with our session + Coordination::Requests drop_lock_requests{ + zkutil::makeRemoveRequest(zk_dropped_lock_path, drop_lock_version), + zkutil::makeCreateRequest(zk_dropped_lock_path, "", zkutil::CreateMode::Ephemeral), + }; + + Coordination::Responses drop_lock_responses; + auto lock_code = client->tryMulti(drop_lock_requests, drop_lock_responses); + if (lock_code == Coordination::Error::ZBADVERSION) + { + LOG_INFO(log, "Someone else is removing leftover nodes"); + metadata_drop_lock->setAlreadyRemoved(); + metadata_drop_lock.reset(); + return; + } + + if (drop_lock_responses[0]->error == Coordination::Error::ZNONODE) + { + /// someone else removed metadata nodes or the previous ephemeral node expired + /// we will try creating dropped lock again to make sure + metadata_drop_lock->setAlreadyRemoved(); + metadata_drop_lock.reset(); + } + else if (lock_code == Coordination::Error::ZOK) + { + metadata_drop_lock->setAlreadyRemoved(); + metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(zk_dropped_lock_path, *client); + drop_lock_version = -1; + Coordination::Stat lock_stat; + client->get(zk_dropped_lock_path, &lock_stat); + drop_lock_version = lock_stat.version; + if (!dropTable(client, metadata_drop_lock)) + { + metadata_drop_lock.reset(); + return; + } + drop_finished = true; + } + } + + if (!drop_finished) + { + auto code = client->tryCreate(zk_dropped_lock_path, "", zkutil::CreateMode::Ephemeral); + + if (code == Coordination::Error::ZNONODE) + { + LOG_INFO(log, "Someone else removed leftover nodes"); + } + else if (code == Coordination::Error::ZNODEEXISTS) + { + LOG_INFO(log, "Someone else is removing leftover nodes"); + return; + } + else if (code != Coordination::Error::ZOK) + { + throw Coordination::Exception::fromPath(code, zk_dropped_lock_path); + } + else + { + metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(zk_dropped_lock_path, *client); + drop_lock_version = -1; + Coordination::Stat lock_stat; + client->get(zk_dropped_lock_path, &lock_stat); + drop_lock_version = lock_stat.version; + if (!dropTable(client, metadata_drop_lock)) + { + metadata_drop_lock.reset(); + return; + } + } + } + } + + Coordination::Requests create_requests{ + zkutil::makeCreateRequest(zk_metadata_path, metadata_string, zkutil::CreateMode::Persistent), + zkutil::makeCreateRequest(zk_data_path, metadata_string, zkutil::CreateMode::Persistent), + zkutil::makeCreateRequest(zk_tables_path, "", zkutil::CreateMode::Persistent), + zkutil::makeCreateRequest(zk_table_path, "", zkutil::CreateMode::Persistent), + }; + + Coordination::Responses create_responses; + auto code = client->tryMulti(create_requests, create_responses); + if (code == Coordination::Error::ZNODEEXISTS) + { + LOG_INFO( + log, "It looks like a table on path {} was created by another server at the same moment, will retry", zk_root_path); + return; + } + else if (code != Coordination::Error::ZOK) + { + zkutil::KeeperMultiException::check(code, create_requests, create_responses); + } + + table_status = TableStatus::VALID; + /// we are the first table created for the specified Keeper path, i.e. we are the first replica + success = true; + }); + + if (success) + return; } - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "Cannot create metadata for table, because it is removed concurrently or because " - "of wrong zk_root_path ({})", zk_root_path); + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Cannot create metadata for table, because it is removed concurrently or because " + "of wrong zk_root_path ({})", + zk_root_path); } @@ -518,7 +606,7 @@ Pipe StorageKeeperMap::read( size_t max_block_size, size_t num_streams) { - checkTable(); + checkTable(context_); storage_snapshot->check(column_names); FieldVectorPtr filtered_keys; @@ -592,13 +680,13 @@ Pipe StorageKeeperMap::read( SinkToStoragePtr StorageKeeperMap::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/) { - checkTable(); + checkTable(local_context); return std::make_shared(*this, metadata_snapshot->getSampleBlock(), local_context); } void StorageKeeperMap::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &) { - checkTable(); + checkTable(local_context); const auto & settings = local_context->getSettingsRef(); ZooKeeperRetriesControl zk_retry{ getName(), @@ -657,7 +745,7 @@ bool StorageKeeperMap::dropTable(zkutil::ZooKeeperPtr zookeeper, const zkutil::E void StorageKeeperMap::drop() { - auto current_table_status = getTableStatus(); + auto current_table_status = getTableStatus(getContext()); if (current_table_status == TableStatus::UNKNOWN) { static constexpr auto error_msg = "Failed to activate table because of connection issues. It will be activated " @@ -666,7 +754,7 @@ void StorageKeeperMap::drop() } /// if only column metadata is wrong we can still drop the table correctly - if (current_table_status == TableStatus::INVALID_KEEPER_STRUCTURE) + if (current_table_status == TableStatus::INVALID_METADATA) return; auto client = getClient(); @@ -1029,7 +1117,7 @@ UInt64 StorageKeeperMap::keysLimit() const return keys_limit; } -StorageKeeperMap::TableStatus StorageKeeperMap::getTableStatus() const +StorageKeeperMap::TableStatus StorageKeeperMap::getTableStatus(const ContextPtr & local_context) const { std::lock_guard lock{init_mutex}; if (table_status != TableStatus::UNKNOWN) @@ -1039,57 +1127,70 @@ StorageKeeperMap::TableStatus StorageKeeperMap::getTableStatus() const { try { - auto client = getClient(); + const auto & settings = local_context->getSettingsRef(); + ZooKeeperRetriesControl zk_retry{ + getName(), + getLogger(getName()), + ZooKeeperRetriesInfo{ + settings.keeper_max_retries, + settings.keeper_retry_initial_backoff_ms, + settings.keeper_retry_max_backoff_ms}, + local_context->getProcessListElement()}; - Coordination::Stat metadata_stat; - auto stored_metadata_string = client->get(zk_metadata_path, &metadata_stat); - - if (metadata_stat.numChildren == 0) + zk_retry.retryLoop([&] { + auto client = getClient(); + + Coordination::Stat metadata_stat; + auto stored_metadata_string = client->get(zk_metadata_path, &metadata_stat); + + if (metadata_stat.numChildren == 0) + { + table_status = TableStatus::INVALID_KEEPER_STRUCTURE; + return; + } + + if (metadata_string != stored_metadata_string) + { + LOG_ERROR( + log, + "Table definition does not match to the one stored in the path {}. Stored definition: {}", + zk_root_path, + stored_metadata_string); + table_status = TableStatus::INVALID_METADATA; + return; + } + + // validate all metadata and data nodes are present + Coordination::Requests requests; + requests.push_back(zkutil::makeCheckRequest(zk_table_path, -1)); + requests.push_back(zkutil::makeCheckRequest(zk_data_path, -1)); + requests.push_back(zkutil::makeCheckRequest(zk_dropped_path, -1)); + + Coordination::Responses responses; + client->tryMulti(requests, responses); + table_status = TableStatus::INVALID_KEEPER_STRUCTURE; - return; - } + if (responses[0]->error != Coordination::Error::ZOK) + { + LOG_ERROR(log, "Table node ({}) is missing", zk_table_path); + return; + } - if (metadata_string != stored_metadata_string) - { - LOG_ERROR( - log, - "Table definition does not match to the one stored in the path {}. Stored definition: {}", - zk_root_path, - stored_metadata_string); - table_status = TableStatus::INVALID_METADATA; - return; - } + if (responses[1]->error != Coordination::Error::ZOK) + { + LOG_ERROR(log, "Data node ({}) is missing", zk_data_path); + return; + } - // validate all metadata and data nodes are present - Coordination::Requests requests; - requests.push_back(zkutil::makeCheckRequest(zk_table_path, -1)); - requests.push_back(zkutil::makeCheckRequest(zk_data_path, -1)); - requests.push_back(zkutil::makeCheckRequest(zk_dropped_path, -1)); + if (responses[2]->error == Coordination::Error::ZOK) + { + LOG_ERROR(log, "Tables with root node {} are being dropped", zk_root_path); + return; + } - Coordination::Responses responses; - client->tryMulti(requests, responses); - - table_status = TableStatus::INVALID_KEEPER_STRUCTURE; - if (responses[0]->error != Coordination::Error::ZOK) - { - LOG_ERROR(log, "Table node ({}) is missing", zk_table_path); - return; - } - - if (responses[1]->error != Coordination::Error::ZOK) - { - LOG_ERROR(log, "Data node ({}) is missing", zk_data_path); - return; - } - - if (responses[2]->error == Coordination::Error::ZOK) - { - LOG_ERROR(log, "Tables with root node {} are being dropped", zk_root_path); - return; - } - - table_status = TableStatus::VALID; + table_status = TableStatus::VALID; + }); } catch (const Coordination::Exception & e) { @@ -1227,7 +1328,7 @@ void StorageKeeperMap::checkMutationIsPossible(const MutationCommands & commands void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr local_context) { - checkTable(); + checkTable(local_context); if (commands.empty()) return; diff --git a/src/Storages/StorageKeeperMap.h b/src/Storages/StorageKeeperMap.h index 8ed348a4f6f..1464eeaabad 100644 --- a/src/Storages/StorageKeeperMap.h +++ b/src/Storages/StorageKeeperMap.h @@ -78,9 +78,9 @@ public: UInt64 keysLimit() const; template - void checkTable() const + void checkTable(const ContextPtr & local_context) const { - auto current_table_status = getTableStatus(); + auto current_table_status = getTableStatus(local_context); if (table_status == TableStatus::UNKNOWN) { static constexpr auto error_msg = "Failed to activate table because of connection issues. It will be activated " @@ -119,7 +119,7 @@ private: VALID }; - TableStatus getTableStatus() const; + TableStatus getTableStatus(const ContextPtr & context) const; void restoreDataImpl( const BackupPtr & backup, diff --git a/tests/integration/test_keeper_map/configs/keeper_retries.xml b/tests/integration/test_keeper_map/configs/keeper_retries.xml new file mode 100644 index 00000000000..43e5b9a09e8 --- /dev/null +++ b/tests/integration/test_keeper_map/configs/keeper_retries.xml @@ -0,0 +1,14 @@ + + + + 0 + 0 + + + + + + default + + + diff --git a/tests/integration/test_keeper_map/test.py b/tests/integration/test_keeper_map/test.py index 4b1bcd11cfe..861a7c47687 100644 --- a/tests/integration/test_keeper_map/test.py +++ b/tests/integration/test_keeper_map/test.py @@ -10,6 +10,7 @@ cluster = ClickHouseCluster(__file__) node = cluster.add_instance( "node", main_configs=["configs/enable_keeper_map.xml"], + user_configs=["configs/keeper_retries.xml"], with_zookeeper=True, stay_alive=True, ) @@ -49,7 +50,6 @@ def assert_keeper_exception_after_partition(query): error = node.query_and_get_error_with_retry( query, sleep_time=1, - settings={"insert_keeper_max_retries": 1, "keeper_max_retries": 1}, ) assert "Coordination::Exception" in error except: @@ -91,7 +91,6 @@ def test_keeper_map_without_zk(started_cluster): error = node.query_and_get_error_with_retry( "SELECT * FROM test_keeper_map_without_zk", sleep_time=1, - settings={"keeper_max_retries": 1}, ) assert "Failed to activate table because of connection issues" in error except: diff --git a/tests/integration/test_keeper_map_retries/configs/fault_injection.xml b/tests/integration/test_keeper_map_retries/configs/fault_injection.xml index 0933b6b3031..8406b7db785 100644 --- a/tests/integration/test_keeper_map_retries/configs/fault_injection.xml +++ b/tests/integration/test_keeper_map_retries/configs/fault_injection.xml @@ -1,5 +1,6 @@ + 1 0.005 0.005 diff --git a/tests/integration/test_keeper_map_retries/configs/keeper_retries.xml b/tests/integration/test_keeper_map_retries/configs/keeper_retries.xml new file mode 100644 index 00000000000..208dd6e47fa --- /dev/null +++ b/tests/integration/test_keeper_map_retries/configs/keeper_retries.xml @@ -0,0 +1,14 @@ + + + + 20 + 10000 + + + + + + default + + + diff --git a/tests/integration/test_keeper_map_retries/test.py b/tests/integration/test_keeper_map_retries/test.py index 352119147cd..c6760e5d1a2 100644 --- a/tests/integration/test_keeper_map_retries/test.py +++ b/tests/integration/test_keeper_map_retries/test.py @@ -11,6 +11,7 @@ cluster = ClickHouseCluster(__file__) node = cluster.add_instance( "node", main_configs=["configs/enable_keeper_map.xml"], + user_configs=["configs/keeper_retries.xml"], with_zookeeper=True, stay_alive=True, ) @@ -42,10 +43,6 @@ def repeat_query(query, repeat): for _ in range(repeat): node.query( query, - settings={ - "keeper_max_retries": 20, - "keeper_retry_max_backoff_ms": 10000, - }, ) @@ -53,10 +50,6 @@ def test_queries(started_cluster): start_clean_clickhouse() node.query("DROP TABLE IF EXISTS keeper_map_retries SYNC") - node.query( - "CREATE TABLE keeper_map_retries (a UInt64, b UInt64) Engine=KeeperMap('/keeper_map_retries') PRIMARY KEY a" - ) - node.stop_clickhouse() node.copy_file_to_container( os.path.join(CONFIG_DIR, "fault_injection.xml"), @@ -66,6 +59,10 @@ def test_queries(started_cluster): repeat_count = 10 + node.query( + "CREATE TABLE keeper_map_retries (a UInt64, b UInt64) Engine=KeeperMap('/keeper_map_retries') PRIMARY KEY a", + ) + repeat_query( "INSERT INTO keeper_map_retries SELECT number, number FROM numbers(500)", repeat_count, From c77f6d78d976430faf4353e350d0205bbecf2837 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Thu, 1 Aug 2024 12:09:58 +0200 Subject: [PATCH 12/13] Update minio --- tests/integration/compose/docker_compose_minio.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/compose/docker_compose_minio.yml b/tests/integration/compose/docker_compose_minio.yml index 4255a529f6d..40098d05b04 100644 --- a/tests/integration/compose/docker_compose_minio.yml +++ b/tests/integration/compose/docker_compose_minio.yml @@ -2,7 +2,7 @@ version: '2.3' services: minio1: - image: minio/minio:RELEASE.2023-09-30T07-02-29Z + image: minio/minio:RELEASE.2024-07-31T05-46-26Z volumes: - data1-1:/data1 - ${MINIO_CERTS_DIR:-}:/certs From 048fbacc40062b05510916134c9d9525e7fab63a Mon Sep 17 00:00:00 2001 From: Tyler Hannan Date: Thu, 1 Aug 2024 16:48:19 +0200 Subject: [PATCH 13/13] Update README.md --- README.md | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 98f9108f14c..2120a4d1211 100644 --- a/README.md +++ b/README.md @@ -34,17 +34,13 @@ curl https://clickhouse.com/ | sh Every month we get together with the community (users, contributors, customers, those interested in learning more about ClickHouse) to discuss what is coming in the latest release. If you are interested in sharing what you've built on ClickHouse, let us know. -* [v24.7 Community Call](https://clickhouse.com/company/events/v24-7-community-release-call) - Jul 30 +* [v24.8 Community Call](https://clickhouse.com/company/events/v24-8-community-release-call) - August 29 ## Upcoming Events Keep an eye out for upcoming meetups and events around the world. Somewhere else you want us to be? Please feel free to reach out to tyler `` clickhouse `` com. You can also peruse [ClickHouse Events](https://clickhouse.com/company/news-events) for a list of all upcoming trainings, meetups, speaking engagements, etc. -* [ClickHouse Meetup in Paris](https://www.meetup.com/clickhouse-france-user-group/events/300783448/) - Jul 9 -* [ClickHouse Cloud - Live Update Call](https://clickhouse.com/company/events/202407-cloud-update-live) - Jul 9 -* [ClickHouse Meetup @ Ramp - New York City](https://www.meetup.com/clickhouse-new-york-user-group/events/300595845/) - Jul 9 -* [AWS Summit in New York](https://clickhouse.com/company/events/2024-07-awssummit-nyc) - Jul 10 -* [ClickHouse Meetup @ Klaviyo - Boston](https://www.meetup.com/clickhouse-boston-user-group/events/300907870) - Jul 11 +* MORE COMING SOON! ## Recent Recordings * **Recent Meetup Videos**: [Meetup Playlist](https://www.youtube.com/playlist?list=PL0Z2YDlm0b3iNDUzpY1S3L_iV4nARda_U) Whenever possible recordings of the ClickHouse Community Meetups are edited and presented as individual talks. Current featuring "Modern SQL in 2023", "Fast, Concurrent, and Consistent Asynchronous INSERTS in ClickHouse", and "Full-Text Indices: Design and Experiments"