From 5c4da392593876391e1e6a4e7b60b9f0f61c911e Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Sat, 30 Nov 2024 22:36:19 +0100 Subject: [PATCH 1/2] Add retries while creating a replicated table. --- src/Backups/RestorerFromBackup.cpp | 13 + src/Backups/RestorerFromBackup.h | 3 +- src/Databases/DatabaseOrdinary.cpp | 2 +- src/Interpreters/InterpreterCreateQuery.cpp | 36 +-- src/Interpreters/InterpreterSystemQuery.cpp | 11 +- .../ReplicatedMergeTreeAttachThread.cpp | 11 +- .../MergeTree/registerStorageMergeTree.cpp | 15 +- src/Storages/StorageReplicatedMergeTree.cpp | 244 ++++++++++++++---- src/Storages/StorageReplicatedMergeTree.h | 40 ++- 9 files changed, 291 insertions(+), 84 deletions(-) diff --git a/src/Backups/RestorerFromBackup.cpp b/src/Backups/RestorerFromBackup.cpp index f907d80a64a..9b3b2408706 100644 --- a/src/Backups/RestorerFromBackup.cpp +++ b/src/Backups/RestorerFromBackup.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -38,6 +39,9 @@ namespace DB { namespace Setting { + extern const SettingsUInt64 backup_restore_keeper_retry_initial_backoff_ms; + extern const SettingsUInt64 backup_restore_keeper_retry_max_backoff_ms; + extern const SettingsUInt64 backup_restore_keeper_max_retries; extern const SettingsSeconds lock_acquire_timeout; } @@ -102,6 +106,10 @@ RestorerFromBackup::RestorerFromBackup( , after_task_callback(after_task_callback_) , create_table_timeout(context->getConfigRef().getUInt64("backups.create_table_timeout", 300000)) , log(getLogger("RestorerFromBackup")) + , zookeeper_retries_info( + context->getSettingsRef()[Setting::backup_restore_keeper_max_retries], + context->getSettingsRef()[Setting::backup_restore_keeper_retry_initial_backoff_ms], + context->getSettingsRef()[Setting::backup_restore_keeper_retry_max_backoff_ms]) , tables_dependencies("RestorerFromBackup") , thread_pool(thread_pool_) { @@ -976,6 +984,11 @@ void RestorerFromBackup::createTable(const QualifiedTableName & table_name) query_context->setSetting("database_replicated_allow_explicit_uuid", 3); query_context->setSetting("database_replicated_allow_replicated_engine_arguments", 3); + /// Creating of replicated tables may need retries. + query_context->setSetting("keeper_max_retries", zookeeper_retries_info.max_retries); + query_context->setSetting("keeper_initial_backoff_ms", zookeeper_retries_info.initial_backoff_ms); + query_context->setSetting("keeper_max_backoff_ms", zookeeper_retries_info.max_backoff_ms); + /// Execute CREATE TABLE query (we call IDatabase::createTableRestoredFromBackup() to allow the database to do some /// database-specific things). database->createTableRestoredFromBackup( diff --git a/src/Backups/RestorerFromBackup.h b/src/Backups/RestorerFromBackup.h index aa5288643a0..3fb314e5c42 100644 --- a/src/Backups/RestorerFromBackup.h +++ b/src/Backups/RestorerFromBackup.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -83,6 +84,7 @@ private: std::chrono::milliseconds create_table_timeout; LoggerPtr log; + const ZooKeeperRetriesInfo zookeeper_retries_info; Mode mode = Mode::RESTORE; Strings all_hosts; DDLRenamingMap renaming_map; @@ -170,7 +172,6 @@ private: TablesDependencyGraph tables_dependencies TSA_GUARDED_BY(mutex); std::vector data_restore_tasks TSA_GUARDED_BY(mutex); std::unique_ptr access_restorer TSA_GUARDED_BY(mutex); - bool access_restored TSA_GUARDED_BY(mutex) = false; std::vector> futures TSA_GUARDED_BY(mutex); std::atomic exception_caught = false; diff --git a/src/Databases/DatabaseOrdinary.cpp b/src/Databases/DatabaseOrdinary.cpp index 3dbfd5f222d..eed29c0d821 100644 --- a/src/Databases/DatabaseOrdinary.cpp +++ b/src/Databases/DatabaseOrdinary.cpp @@ -408,7 +408,7 @@ void DatabaseOrdinary::restoreMetadataAfterConvertingToReplicated(StoragePtr tab } else { - rmt->restoreMetadataInZooKeeper(); + rmt->restoreMetadataInZooKeeper(/* query_status = */ nullptr, /* zookeeper_retries_info = */ {}); LOG_INFO ( log, diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp index 9e81871579d..e8add48a4c2 100644 --- a/src/Interpreters/InterpreterCreateQuery.cpp +++ b/src/Interpreters/InterpreterCreateQuery.cpp @@ -1633,29 +1633,29 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create) if (isReplicated(*inner_table_engine)) is_storage_replicated = true; } - } + } - bool allow_heavy_populate = getContext()->getSettingsRef()[Setting::database_replicated_allow_heavy_create] && create.is_populate; - if (!allow_heavy_populate && database && database->getEngineName() == "Replicated" && (create.select || create.is_populate)) + bool allow_heavy_populate = getContext()->getSettingsRef()[Setting::database_replicated_allow_heavy_create] && create.is_populate; + if (!allow_heavy_populate && database && database->getEngineName() == "Replicated" && (create.select || create.is_populate)) + { + const bool allow_create_select_for_replicated + = (create.isView() && !create.is_populate) || create.is_create_empty || !is_storage_replicated; + if (!allow_create_select_for_replicated) { - const bool allow_create_select_for_replicated - = (create.isView() && !create.is_populate) || create.is_create_empty || !is_storage_replicated; - if (!allow_create_select_for_replicated) - { - /// POPULATE can be enabled with setting, provide hint in error message - if (create.is_populate) - throw Exception( - ErrorCodes::SUPPORT_IS_DISABLED, - "CREATE with POPULATE is not supported with Replicated databases. Consider using separate CREATE and INSERT " - "queries. " - "Alternatively, you can enable 'database_replicated_allow_heavy_create' setting to allow this operation, use with " - "caution"); - + /// POPULATE can be enabled with setting, provide hint in error message + if (create.is_populate) throw Exception( ErrorCodes::SUPPORT_IS_DISABLED, - "CREATE AS SELECT is not supported with Replicated databases. Consider using separate CREATE and INSERT queries."); - } + "CREATE with POPULATE is not supported with Replicated databases. Consider using separate CREATE and INSERT " + "queries. " + "Alternatively, you can enable 'database_replicated_allow_heavy_create' setting to allow this operation, use with " + "caution"); + + throw Exception( + ErrorCodes::SUPPORT_IS_DISABLED, + "CREATE AS SELECT is not supported with Replicated databases. Consider using separate CREATE and INSERT queries."); } + } if (create.is_clone_as) { diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index 9aec16a3fb7..ca96ee3245f 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -98,6 +98,9 @@ namespace DB { namespace Setting { + extern const SettingsUInt64 keeper_max_retries; + extern const SettingsUInt64 keeper_retry_initial_backoff_ms; + extern const SettingsUInt64 keeper_retry_max_backoff_ms; extern const SettingsSeconds lock_acquire_timeout; extern const SettingsSeconds receive_timeout; extern const SettingsMaxThreads max_threads; @@ -878,7 +881,13 @@ void InterpreterSystemQuery::restoreReplica() if (table_replicated_ptr == nullptr) throw Exception(ErrorCodes::BAD_ARGUMENTS, table_is_not_replicated.data(), table_id.getNameForLogs()); - table_replicated_ptr->restoreMetadataInZooKeeper(); + const auto & settings = getContext()->getSettingsRef(); + + table_replicated_ptr->restoreMetadataInZooKeeper( + getContext()->getProcessListElementSafe(), + ZooKeeperRetriesInfo{settings[Setting::keeper_max_retries], + settings[Setting::keeper_retry_initial_backoff_ms], + settings[Setting::keeper_retry_max_backoff_ms]}); } StoragePtr InterpreterSystemQuery::tryRestartReplica(const StorageID & replica, ContextMutablePtr system_context) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp index c258048354e..c654b459c24 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp @@ -166,23 +166,24 @@ void ReplicatedMergeTreeAttachThread::runImpl() /// Just in case it was not removed earlier due to connection loss zookeeper->tryRemove(replica_path + "/flags/force_restore_data"); - storage.checkTableStructure(replica_path, metadata_snapshot); + /// Here `zookeeper_retries_info = {}` because the attach thread has its own retries (see ReplicatedMergeTreeAttachThread::run()). + storage.checkTableStructure(replica_path, metadata_snapshot, /* metadata_version = */ nullptr, /* strict_check = */ true, /* zookeeper_retries_info = */ {}, /* process_list_element = */ nullptr); storage.checkParts(skip_sanity_checks); /// Temporary directories contain uninitialized results of Merges or Fetches (after forced restart), /// don't allow to reinitialize them, delete each of them immediately. storage.clearOldTemporaryDirectories(0, {"tmp_", "delete_tmp_", "tmp-fetch_"}); - storage.createNewZooKeeperNodes(); - storage.syncPinnedPartUUIDs(); + storage.createNewZooKeeperNodes(/* zookeeper_retries_info = */ {}, /* process_list_element = */ nullptr); + storage.syncPinnedPartUUIDs(/* zookeeper_retries_info = */ {}, /* process_list_element = */ nullptr); std::lock_guard lock(storage.table_shared_id_mutex); - storage.createTableSharedID(); + storage.createTableSharedID(/* zookeeper_retries_info = */ {}, /* process_list_element = */ nullptr); }; void ReplicatedMergeTreeAttachThread::finalizeInitialization() TSA_NO_THREAD_SAFETY_ANALYSIS { - storage.startupImpl(/* from_attach_thread */ true); + storage.startupImpl(/* from_attach_thread */ true, /* zookeeper_retries_info = */ {}, /* process_list_element = */ nullptr); storage.initialization_done = true; LOG_INFO(log, "Table is initialized"); } diff --git a/src/Storages/MergeTree/registerStorageMergeTree.cpp b/src/Storages/MergeTree/registerStorageMergeTree.cpp index 9f66a079998..12b5d115903 100644 --- a/src/Storages/MergeTree/registerStorageMergeTree.cpp +++ b/src/Storages/MergeTree/registerStorageMergeTree.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include @@ -37,6 +38,9 @@ namespace Setting extern const SettingsBool allow_suspicious_ttl_expressions; extern const SettingsBool create_table_empty_primary_key_by_default; extern const SettingsUInt64 database_replicated_allow_replicated_engine_arguments; + extern const SettingsUInt64 keeper_max_retries; + extern const SettingsUInt64 keeper_retry_initial_backoff_ms; + extern const SettingsUInt64 keeper_retry_max_backoff_ms; } namespace MergeTreeSetting @@ -831,6 +835,12 @@ static StoragePtr create(const StorageFactory::Arguments & args) if (auto txn = args.getLocalContext()->getZooKeeperMetadataTransaction()) need_check_table_structure = txn->isInitialQuery(); + ZooKeeperRetriesInfo create_query_zk_retries_info; + create_query_zk_retries_info.max_retries = local_settings[Setting::keeper_max_retries]; + create_query_zk_retries_info.initial_backoff_ms = local_settings[Setting::keeper_retry_initial_backoff_ms]; + create_query_zk_retries_info.max_backoff_ms = local_settings[Setting::keeper_retry_max_backoff_ms]; + auto create_query_status = args.getLocalContext()->getProcessListElementSafe(); + return std::make_shared( zookeeper_info, args.mode, @@ -841,8 +851,11 @@ static StoragePtr create(const StorageFactory::Arguments & args) date_column_name, merging_params, std::move(storage_settings), - need_check_table_structure); + need_check_table_structure, + create_query_zk_retries_info, + create_query_status); } + return std::make_shared( args.table_id, args.relative_data_path, diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index bd476625081..ac1b37ecbdb 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -366,7 +366,9 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( const String & date_column_name, const MergingParams & merging_params_, std::unique_ptr settings_, - bool need_check_structure) + bool need_check_structure, + const ZooKeeperRetriesInfo & create_query_zookeeper_retries_info_, + QueryStatusPtr create_query_status_) : MergeTreeData(table_id_, metadata_, context_, @@ -380,6 +382,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( , zookeeper_path(zookeeper_info.path) , replica_name(zookeeper_info.replica_name) , replica_path(fs::path(zookeeper_path) / "replicas" / replica_name) + , create_query_zookeeper_retries_info(create_query_zookeeper_retries_info_) + , create_query_status(create_query_status_) , reader(*this) , writer(*this) , merger_mutator(*this) @@ -569,7 +573,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( try { - bool is_first_replica = createTableIfNotExists(metadata_snapshot); + bool is_first_replica = createTableIfNotExists(metadata_snapshot, getCreateQueryZooKeeperRetriesInfo(), getCreateQueryStatus()); try { @@ -578,24 +582,22 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( /// We have to check granularity on other replicas. If it's fixed we /// must create our new replica with fixed granularity and store this /// information in /replica/metadata. - other_replicas_fixed_granularity = checkFixedGranularityInZookeeper(); + other_replicas_fixed_granularity = checkFixedGranularityInZookeeper(getCreateQueryZooKeeperRetriesInfo(), getCreateQueryStatus()); /// Allow structure mismatch for secondary queries from Replicated database. /// It may happen if the table was altered just after creation. /// Metadata will be updated in cloneMetadataIfNeeded(...), metadata_version will be 0 for a while. - bool same_structure = checkTableStructure(zookeeper_path, metadata_snapshot, need_check_structure); + int32_t metadata_version; + bool same_structure = checkTableStructure(zookeeper_path, metadata_snapshot, &metadata_version, need_check_structure, getCreateQueryZooKeeperRetriesInfo(), getCreateQueryStatus()); if (same_structure) { - Coordination::Stat metadata_stat; - current_zookeeper->get(fs::path(zookeeper_path) / "metadata", &metadata_stat); - /** We change metadata_snapshot so that `createReplica` method will create `metadata_version` node in ZooKeeper * with version of table '/metadata' node in Zookeeper. * * Otherwise `metadata_version` for not first replica will be initialized with 0 by default. */ - setInMemoryMetadata(metadata_snapshot->withMetadataVersion(metadata_stat.version)); + setInMemoryMetadata(metadata_snapshot->withMetadataVersion(metadata_version)); metadata_snapshot = getInMemoryMetadataPtr(); } } @@ -607,15 +609,13 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( } if (!is_first_replica) - createReplica(metadata_snapshot); + createReplica(metadata_snapshot, getCreateQueryZooKeeperRetriesInfo(), getCreateQueryStatus()); - createNewZooKeeperNodes(); - syncPinnedPartUUIDs(); + createNewZooKeeperNodes(getCreateQueryZooKeeperRetriesInfo(), getCreateQueryStatus()); + syncPinnedPartUUIDs(getCreateQueryZooKeeperRetriesInfo(), getCreateQueryStatus()); if (!has_metadata_in_zookeeper.has_value() || *has_metadata_in_zookeeper) - createTableSharedID(); - - + createTableSharedID(getCreateQueryZooKeeperRetriesInfo(), getCreateQueryStatus()); } catch (...) { @@ -628,12 +628,29 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( } -bool StorageReplicatedMergeTree::checkFixedGranularityInZookeeper() +bool StorageReplicatedMergeTree::checkFixedGranularityInZookeeper(const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element) const { - auto zookeeper = getZooKeeper(); - String metadata_str = zookeeper->get(zookeeper_path + "/metadata"); - auto metadata_from_zk = ReplicatedMergeTreeTableMetadata::parse(metadata_str); - return metadata_from_zk.index_granularity_bytes == 0; + bool fixed_granularity = false; + + auto check_fixed_granularity = [&] + { + auto zookeeper = getZooKeeper(); + String metadata_str = zookeeper->get(zookeeper_path + "/metadata"); + auto metadata_from_zk = ReplicatedMergeTreeTableMetadata::parse(metadata_str); + fixed_granularity = (metadata_from_zk.index_granularity_bytes == 0); + }; + + if (zookeeper_retries_info.max_retries > 0) + { + ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::checkFixedGranularityInZookeeper", log.load(), zookeeper_retries_info, process_list_element}; + retries_ctl.retryLoop([&] { check_fixed_granularity(); }); + } + else + { + check_fixed_granularity(); + } + + return fixed_granularity; } @@ -808,7 +825,20 @@ std::vector getAncestors(const String & path) } -void StorageReplicatedMergeTree::createNewZooKeeperNodes() +void StorageReplicatedMergeTree::createNewZooKeeperNodes(const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element) const +{ + if (zookeeper_retries_info.max_retries > 0) + { + ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::createNewZooKeeperNodes", log.load(), zookeeper_retries_info, process_list_element}; + retries_ctl.retryLoop([&] { createNewZooKeeperNodesAttempt(); }); + } + else + { + createNewZooKeeperNodesAttempt(); + } +} + +void StorageReplicatedMergeTree::createNewZooKeeperNodesAttempt() const { auto zookeeper = getZooKeeper(); @@ -873,14 +903,34 @@ void StorageReplicatedMergeTree::createNewZooKeeperNodes() } } +bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot, + const ZooKeeperRetriesInfo & zookeeper_retries_info, + QueryStatusPtr process_list_element) const +{ + bool table_created = false; + if (zookeeper_retries_info.max_retries > 0) + { + ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::createTableIfNotExists", log.load(), zookeeper_retries_info, process_list_element}; + retries_ctl.retryLoop([&] { table_created = createTableIfNotExistsAttempt(metadata_snapshot, process_list_element); }); + } + else + { + table_created = createTableIfNotExistsAttempt(metadata_snapshot, process_list_element); + } + return table_created; +} -bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot) +bool StorageReplicatedMergeTree::createTableIfNotExistsAttempt(const StorageMetadataPtr & metadata_snapshot, QueryStatusPtr process_list_element) const { auto zookeeper = getZooKeeper(); zookeeper->createAncestors(zookeeper_path); for (size_t i = 0; i < 1000; ++i) { + /// Check if the query was cancelled. + if (process_list_element) + process_list_element->checkTimeLimit(); + /// Invariant: "replicas" does not exist if there is no table or if there are leftovers from incompletely dropped table. if (zookeeper->exists(zookeeper_path + "/replicas")) { @@ -1019,7 +1069,22 @@ bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr "of wrong zookeeper_path or because of logical error"); } -void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metadata_snapshot) +void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metadata_snapshot, + const ZooKeeperRetriesInfo & zookeeper_retries_info, + QueryStatusPtr process_list_element) const +{ + if (zookeeper_retries_info.max_retries > 0) + { + ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::createReplica", log.load(), zookeeper_retries_info, process_list_element}; + retries_ctl.retryLoop([&] { createReplicaAttempt(metadata_snapshot, process_list_element); }); + } + else + { + createReplicaAttempt(metadata_snapshot, process_list_element); + } +} + +void StorageReplicatedMergeTree::createReplicaAttempt(const StorageMetadataPtr & metadata_snapshot, QueryStatusPtr process_list_element) const { auto zookeeper = getZooKeeper(); @@ -1103,6 +1168,10 @@ void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metada do { + /// Check if the query was cancelled. + if (process_list_element) + process_list_element->checkTimeLimit(); + Coordination::Stat replicas_stat; String replicas_value; @@ -1169,6 +1238,25 @@ void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metada } while (code == Coordination::Error::ZBADVERSION); } +ZooKeeperRetriesInfo StorageReplicatedMergeTree::getCreateQueryZooKeeperRetriesInfo() const +{ + std::lock_guard lock{create_query_zookeeper_retries_info_mutex}; + return create_query_zookeeper_retries_info; +} + +QueryStatusPtr StorageReplicatedMergeTree::getCreateQueryStatus() const +{ + std::lock_guard lock{create_query_zookeeper_retries_info_mutex}; + return create_query_status; +} + +void StorageReplicatedMergeTree::clearCreateQueryZooKeeperRetriesInfo() +{ + std::lock_guard lock{create_query_zookeeper_retries_info_mutex}; + create_query_zookeeper_retries_info = {}; + create_query_status = {}; +} + zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeperIfTableShutDown() const { @@ -1530,7 +1618,26 @@ bool StorageReplicatedMergeTree::removeTableNodesFromZooKeeper(zkutil::ZooKeeper /** Verify that list of columns and table storage_settings_ptr match those specified in ZK (/metadata). * If not, throw an exception. */ -bool StorageReplicatedMergeTree::checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot, bool strict_check) +bool StorageReplicatedMergeTree::checkTableStructure( + const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot, int32_t * metadata_version, bool strict_check, + const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element) const +{ + bool same_structure = false; + if (zookeeper_retries_info.max_retries > 0) + { + ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::checkTableStructure", log.load(), zookeeper_retries_info, process_list_element}; + retries_ctl.retryLoop([&] { same_structure = checkTableStructureAttempt(zookeeper_prefix, metadata_snapshot, metadata_version, strict_check); }); + } + else + { + same_structure = checkTableStructureAttempt(zookeeper_prefix, metadata_snapshot, metadata_version, strict_check); + } + return same_structure; +} + + +bool StorageReplicatedMergeTree::checkTableStructureAttempt( + const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot, int32_t * metadata_version, bool strict_check) const { auto zookeeper = getZooKeeper(); @@ -1541,6 +1648,9 @@ bool StorageReplicatedMergeTree::checkTableStructure(const String & zookeeper_pr auto metadata_from_zk = ReplicatedMergeTreeTableMetadata::parse(metadata_str); old_metadata.checkEquals(metadata_from_zk, metadata_snapshot->getColumns(), getContext()); + if (metadata_version) + *metadata_version = metadata_stat.version; + Coordination::Stat columns_stat; auto columns_from_zk = ColumnsDescription::parse(zookeeper->get(fs::path(zookeeper_prefix) / "columns", &columns_stat)); @@ -1860,21 +1970,35 @@ bool StorageReplicatedMergeTree::checkPartsImpl(bool skip_sanity_checks) } -void StorageReplicatedMergeTree::syncPinnedPartUUIDs() +void StorageReplicatedMergeTree::syncPinnedPartUUIDs(const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element) { - auto zookeeper = getZooKeeper(); + String new_pinned_part_uuids_str; + Coordination::Stat new_stat; - Coordination::Stat stat; - String s = zookeeper->get(zookeeper_path + "/pinned_part_uuids", &stat); + auto read_pinned_part_uuids = [&] + { + auto zookeeper = getZooKeeper(); + new_pinned_part_uuids_str = zookeeper->get(zookeeper_path + "/pinned_part_uuids", &new_stat); + }; + + if (zookeeper_retries_info.max_retries > 0) + { + ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::syncPinnedPartUUIDs", log.load(), zookeeper_retries_info, process_list_element}; + retries_ctl.retryLoop([&] { read_pinned_part_uuids(); }); + } + else + { + read_pinned_part_uuids(); + } std::lock_guard lock(pinned_part_uuids_mutex); /// Unsure whether or not this can be called concurrently. - if (pinned_part_uuids->stat.version < stat.version) + if (pinned_part_uuids->stat.version < new_stat.version) { auto new_pinned_part_uuids = std::make_shared(); - new_pinned_part_uuids->fromString(s); - new_pinned_part_uuids->stat = stat; + new_pinned_part_uuids->fromString(new_pinned_part_uuids_str); + new_pinned_part_uuids->stat = new_stat; pinned_part_uuids = new_pinned_part_uuids; } @@ -2228,7 +2352,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry) case LogEntry::ALTER_METADATA: return executeMetadataAlter(entry); case LogEntry::SYNC_PINNED_PART_UUIDS: - syncPinnedPartUUIDs(); + syncPinnedPartUUIDs(/* zookeeper_retries_info = */ {}, /* process_list_element = */ nullptr); return true; case LogEntry::CLONE_PART_FROM_SHARD: executeClonePartFromShard(entry); @@ -4377,17 +4501,29 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n } -void StorageReplicatedMergeTree::startBeingLeader() +void StorageReplicatedMergeTree::startBeingLeader(const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element) { - auto zookeeper = getZooKeeper(); - if (!(*getSettings())[MergeTreeSetting::replicated_can_become_leader]) { LOG_INFO(log, "Will not enter leader election because replicated_can_become_leader=0"); return; } - zkutil::checkNoOldLeaders(log.load(), *zookeeper, fs::path(zookeeper_path) / "leader_election"); + auto start_being_leader = [&] + { + auto zookeeper = getZooKeeper(); + zkutil::checkNoOldLeaders(log.load(), *zookeeper, fs::path(zookeeper_path) / "leader_election"); + }; + + if (zookeeper_retries_info.max_retries > 0) + { + ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::startBeingLeader", log.load(), zookeeper_retries_info, process_list_element}; + retries_ctl.retryLoop([&] { start_being_leader(); }); + } + else + { + start_being_leader(); + } LOG_INFO(log, "Became leader"); is_leader = true; @@ -5260,10 +5396,10 @@ void StorageReplicatedMergeTree::startup() return; } - startupImpl(/* from_attach_thread */ false); + startupImpl(/* from_attach_thread */ false, getCreateQueryZooKeeperRetriesInfo(), getCreateQueryStatus()); } -void StorageReplicatedMergeTree::startupImpl(bool from_attach_thread) +void StorageReplicatedMergeTree::startupImpl(bool from_attach_thread, const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element) { /// Do not start replication if ZooKeeper is not configured or there is no metadata in zookeeper if (!has_metadata_in_zookeeper.has_value() || !*has_metadata_in_zookeeper) @@ -5292,7 +5428,7 @@ void StorageReplicatedMergeTree::startupImpl(bool from_attach_thread) getContext()->getInterserverIOHandler().addEndpoint( data_parts_exchange_ptr->getId(getEndpointName()), data_parts_exchange_ptr); - startBeingLeader(); + startBeingLeader(zookeeper_retries_info, process_list_element); if (from_attach_thread) { @@ -5330,6 +5466,9 @@ void StorageReplicatedMergeTree::startupImpl(bool from_attach_thread) startBackgroundMovesIfNeeded(); part_moves_between_shards_orchestrator.start(); + + /// After finishing startup() create_query_zk_retries_info won't be used anymore. + clearCreateQueryZooKeeperRetriesInfo(); } catch (...) { @@ -6544,7 +6683,7 @@ bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition( return true; } -void StorageReplicatedMergeTree::restoreMetadataInZooKeeper() +void StorageReplicatedMergeTree::restoreMetadataInZooKeeper(QueryStatusPtr query_status, const ZooKeeperRetriesInfo & zookeeper_retries_info) { LOG_INFO(log, "Restoring replica metadata"); @@ -6587,14 +6726,14 @@ void StorageReplicatedMergeTree::restoreMetadataInZooKeeper() LOG_INFO(log, "Moved all parts to detached/"); - const bool is_first_replica = createTableIfNotExists(metadata_snapshot); + const bool is_first_replica = createTableIfNotExists(metadata_snapshot, zookeeper_retries_info, query_status); LOG_INFO(log, "Created initial ZK nodes, replica is first: {}", is_first_replica); if (!is_first_replica) - createReplica(metadata_snapshot); + createReplica(metadata_snapshot, zookeeper_retries_info, query_status); - createNewZooKeeperNodes(); + createNewZooKeeperNodes(zookeeper_retries_info, query_status); LOG_INFO(log, "Created ZK nodes for table"); @@ -6606,7 +6745,7 @@ void StorageReplicatedMergeTree::restoreMetadataInZooKeeper() LOG_INFO(log, "Attached all partitions, starting table"); - startupImpl(/* from_attach_thread */ false); + startupImpl(/* from_attach_thread */ false, zookeeper_retries_info, query_status); } void StorageReplicatedMergeTree::dropPartNoWaitNoThrow(const String & part_name) @@ -8785,7 +8924,7 @@ void StorageReplicatedMergeTree::movePartitionToShard( { /// Optimistic check that for compatible destination table structure. - checkTableStructure(to, getInMemoryMetadataPtr()); + checkTableStructure(to, getInMemoryMetadataPtr(), /* metadata_version = */ nullptr, /* strict_check = */ true, /* zookeeper_retries_info = */ {}, /* process_list_element = */ nullptr); } PinnedPartUUIDs src_pins; @@ -9388,7 +9527,7 @@ String StorageReplicatedMergeTree::getTableSharedID() const { /// Can happen if table was partially initialized before drop by DatabaseCatalog if (table_shared_id == UUIDHelpers::Nil) - createTableSharedID(); + createTableSharedID(/* zookeeper_retries_info = */ {}, /* process_list_element = */ nullptr); } else { @@ -9403,7 +9542,20 @@ std::map StorageReplicatedMergeTree::getUnfinishe return queue.getUnfinishedMutations(); } -void StorageReplicatedMergeTree::createTableSharedID() const +void StorageReplicatedMergeTree::createTableSharedID(const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element) const +{ + if (zookeeper_retries_info.max_retries > 0) + { + ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::createTableSharedID", log.load(), zookeeper_retries_info, process_list_element}; + retries_ctl.retryLoop([&] { createTableSharedIDAttempt(); }); + } + else + { + createTableSharedIDAttempt(); + } +} + +void StorageReplicatedMergeTree::createTableSharedIDAttempt() const { LOG_DEBUG(log, "Creating shared ID for table {}", getStorageID().getNameForLogs()); // can be set by the call to getTableSharedID diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index a790e548645..c6081d45bf4 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -108,7 +109,9 @@ public: const String & date_column_name, const MergingParams & merging_params_, std::unique_ptr settings_, - bool need_check_structure); + bool need_check_structure, + const ZooKeeperRetriesInfo & create_query_zookeeper_retries_info_, + QueryStatusPtr create_query_status_); void startup() override; @@ -313,7 +316,7 @@ public: /// Restores table metadata if ZooKeeper lost it. /// Used only on restarted readonly replicas (not checked). All active (Active) parts are moved to detached/ /// folder and attached. Parts in all other states are just moved to detached/ folder. - void restoreMetadataInZooKeeper(); + void restoreMetadataInZooKeeper(QueryStatusPtr query_status, const ZooKeeperRetriesInfo & zookeeper_retries_info); /// Get throttler for replicated fetches ThrottlerPtr getFetchesThrottler() const @@ -424,6 +427,10 @@ private: const String replica_name; // shorthand for zookeeper_info.replica_name const String replica_path; + ZooKeeperRetriesInfo create_query_zookeeper_retries_info TSA_GUARDED_BY(create_query_zookeeper_retries_info_mutex); + QueryStatusPtr create_query_status TSA_GUARDED_BY(create_query_zookeeper_retries_info_mutex); + mutable std::mutex create_query_zookeeper_retries_info_mutex; + /** /replicas/me/is_active. */ zkutil::EphemeralNodeHolderPtr replica_is_active_node; @@ -572,18 +579,28 @@ private: /** Creates the minimum set of nodes in ZooKeeper and create first replica. * Returns true if was created, false if exists. */ - bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot); + bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot, const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element) const; + bool createTableIfNotExistsAttempt(const StorageMetadataPtr & metadata_snapshot, QueryStatusPtr process_list_element) const; /** * Creates a replica in ZooKeeper and adds to the queue all that it takes to catch up with the rest of the replicas. */ - void createReplica(const StorageMetadataPtr & metadata_snapshot); + void createReplica(const StorageMetadataPtr & metadata_snapshot, const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element) const; + void createReplicaAttempt(const StorageMetadataPtr & metadata_snapshot, QueryStatusPtr process_list_element) const; /** Create nodes in the ZK, which must always be, but which might not exist when older versions of the server are running. */ - void createNewZooKeeperNodes(); + void createNewZooKeeperNodes(const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element) const; + void createNewZooKeeperNodesAttempt() const; - bool checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot, bool strict_check = true); + /// Returns the ZooKeeper retries info specified for the CREATE TABLE query which is creating and starting this table right now. + ZooKeeperRetriesInfo getCreateQueryZooKeeperRetriesInfo() const; + QueryStatusPtr getCreateQueryStatus() const; + void clearCreateQueryZooKeeperRetriesInfo(); + + bool checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot, int32_t * metadata_version, bool strict_check, + const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element) const; + bool checkTableStructureAttempt(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot, int32_t * metadata_version, bool strict_check) const; /// A part of ALTER: apply metadata changes only (data parts are altered separately). /// Must be called under IStorage::lockForAlter() lock. @@ -602,7 +619,7 @@ private: /// Synchronize the list of part uuids which are currently pinned. These should be sent to root query executor /// to be used for deduplication. - void syncPinnedPartUUIDs(); + void syncPinnedPartUUIDs(const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element); /** Check that the part's checksum is the same as the checksum of the same part on some other replica. * If no one has such a part, nothing checks. @@ -705,7 +722,7 @@ private: /// Start being leader (if not disabled by setting). /// Since multi-leaders are allowed, it just sets is_leader flag. - void startBeingLeader(); + void startBeingLeader(const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element); void stopBeingLeader(); /** Selects the parts to merge and writes to the log. @@ -922,7 +939,7 @@ private: /// Check granularity of already existing replicated table in zookeeper if it exists /// return true if it's fixed - bool checkFixedGranularityInZookeeper(); + bool checkFixedGranularityInZookeeper(const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element) const; /// Wait for timeout seconds mutation is finished on replicas void waitMutationToFinishOnReplicas( @@ -960,7 +977,8 @@ private: void createAndStoreFreezeMetadata(DiskPtr disk, DataPartPtr part, String backup_part_path) const override; // Create table id if needed - void createTableSharedID() const; + void createTableSharedID(const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element) const; + void createTableSharedIDAttempt() const; bool checkZeroCopyLockExists(const String & part_name, const DiskPtr & disk, String & lock_replica); void watchZeroCopyLock(const String & part_name, const DiskPtr & disk); @@ -976,7 +994,7 @@ private: /// Or if node actually disappeared. bool waitZeroCopyLockToDisappear(const ZeroCopyLock & lock, size_t milliseconds_to_wait) override; - void startupImpl(bool from_attach_thread); + void startupImpl(bool from_attach_thread, const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element); std::vector getZookeeperZeroCopyLockPaths() const; static void dropZookeeperZeroCopyLockPaths(zkutil::ZooKeeperPtr zookeeper, From 136c5de31c79a41a9a2f006b00b94ce6de8af976 Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Sun, 1 Dec 2024 23:21:10 +0100 Subject: [PATCH 2/2] Add field query_status to the ZooKeeperRetriesInfo structure. --- src/Backups/BackupCoordinationOnCluster.cpp | 4 +- src/Backups/BackupCoordinationOnCluster.h | 3 +- src/Backups/BackupEntriesCollector.cpp | 8 +- src/Backups/BackupEntriesCollector.h | 4 +- src/Backups/RestoreCoordinationOnCluster.cpp | 4 +- src/Backups/RestoreCoordinationOnCluster.h | 3 +- src/Backups/RestorerFromBackup.cpp | 3 +- src/Backups/WithRetries.cpp | 5 +- src/Common/ZooKeeper/ZooKeeperRetries.h | 24 ++--- src/Databases/DatabaseOrdinary.cpp | 2 +- src/Databases/DatabaseReplicatedWorker.cpp | 2 +- src/Databases/DatabaseReplicatedWorker.h | 2 +- src/Interpreters/DDLWorker.cpp | 4 +- src/Interpreters/DDLWorker.h | 2 +- .../DistributedQueryStatusSource.cpp | 11 +-- .../DistributedQueryStatusSource.h | 2 +- src/Interpreters/InterpreterSystemQuery.cpp | 4 +- src/Interpreters/executeDDLQueryOnCluster.cpp | 2 +- .../ReplicatedMergeTreeAttachThread.cpp | 10 +- .../MergeTree/ReplicatedMergeTreeSink.cpp | 8 +- .../MergeTree/registerStorageMergeTree.cpp | 13 ++- src/Storages/StorageKeeperMap.cpp | 40 +++++--- src/Storages/StorageReplicatedMergeTree.cpp | 99 ++++++++----------- src/Storages/StorageReplicatedMergeTree.h | 25 +++-- .../System/StorageSystemZooKeeper.cpp | 7 +- 25 files changed, 148 insertions(+), 143 deletions(-) diff --git a/src/Backups/BackupCoordinationOnCluster.cpp b/src/Backups/BackupCoordinationOnCluster.cpp index 1b14f226eff..4fee3bb952e 100644 --- a/src/Backups/BackupCoordinationOnCluster.cpp +++ b/src/Backups/BackupCoordinationOnCluster.cpp @@ -182,6 +182,7 @@ BackupCoordinationOnCluster::BackupCoordinationOnCluster( , current_host(current_host_) , current_host_index(findCurrentHostIndex(current_host, all_hosts)) , plain_backup(is_plain_backup_) + , process_list_element(process_list_element_) , log(getLogger("BackupCoordinationOnCluster")) , with_retries(log, get_zookeeper_, keeper_settings, process_list_element_, [root_zookeeper_path_](Coordination::ZooKeeperWithFaultInjection::Ptr zk) { zk->sync(root_zookeeper_path_); }) , cleaner(/* is_restore = */ false, zookeeper_path, with_retries, log) @@ -273,7 +274,8 @@ ZooKeeperRetriesInfo BackupCoordinationOnCluster::getOnClusterInitializationKeep { return ZooKeeperRetriesInfo{keeper_settings.max_retries_while_initializing, static_cast(keeper_settings.retry_initial_backoff_ms.count()), - static_cast(keeper_settings.retry_max_backoff_ms.count())}; + static_cast(keeper_settings.retry_max_backoff_ms.count()), + process_list_element}; } void BackupCoordinationOnCluster::serializeToMultipleZooKeeperNodes(const String & path, const String & value, const String & logging_name) diff --git a/src/Backups/BackupCoordinationOnCluster.h b/src/Backups/BackupCoordinationOnCluster.h index b439ab619d8..9bfc33d9fbf 100644 --- a/src/Backups/BackupCoordinationOnCluster.h +++ b/src/Backups/BackupCoordinationOnCluster.h @@ -107,7 +107,8 @@ private: const String current_host; const size_t current_host_index; const bool plain_backup; - LoggerPtr const log; + const QueryStatusPtr process_list_element; + const LoggerPtr log; /// The order is important: `stage_sync` must be initialized after `with_retries` and `cleaner`. const WithRetries with_retries; diff --git a/src/Backups/BackupEntriesCollector.cpp b/src/Backups/BackupEntriesCollector.cpp index 00a4471d994..1ff190c171d 100644 --- a/src/Backups/BackupEntriesCollector.cpp +++ b/src/Backups/BackupEntriesCollector.cpp @@ -111,10 +111,11 @@ BackupEntriesCollector::BackupEntriesCollector( context->getConfigRef().getUInt64("backups.max_sleep_before_next_attempt_to_collect_metadata", 5000)) , compare_collected_metadata(context->getConfigRef().getBool("backups.compare_collected_metadata", true)) , log(getLogger("BackupEntriesCollector")) - , global_zookeeper_retries_info( + , zookeeper_retries_info( context->getSettingsRef()[Setting::backup_restore_keeper_max_retries], context->getSettingsRef()[Setting::backup_restore_keeper_retry_initial_backoff_ms], - context->getSettingsRef()[Setting::backup_restore_keeper_retry_max_backoff_ms]) + context->getSettingsRef()[Setting::backup_restore_keeper_retry_max_backoff_ms], + context->getProcessListElementSafe()) , threadpool(threadpool_) { } @@ -582,8 +583,7 @@ std::vector> BackupEntriesCollector::findTablesInD try { /// Database or table could be replicated - so may use ZooKeeper. We need to retry. - auto zookeeper_retries_info = global_zookeeper_retries_info; - ZooKeeperRetriesControl retries_ctl("getTablesForBackup", log, zookeeper_retries_info, nullptr); + ZooKeeperRetriesControl retries_ctl("getTablesForBackup", log, zookeeper_retries_info); retries_ctl.retryLoop([&](){ db_tables = database->getTablesForBackup(filter_by_table_name, context); }); } catch (Exception & e) diff --git a/src/Backups/BackupEntriesCollector.h b/src/Backups/BackupEntriesCollector.h index 504489cce6b..d8b2dfd38e7 100644 --- a/src/Backups/BackupEntriesCollector.h +++ b/src/Backups/BackupEntriesCollector.h @@ -48,7 +48,7 @@ public: std::shared_ptr getBackupCoordination() const { return backup_coordination; } const ReadSettings & getReadSettings() const { return read_settings; } ContextPtr getContext() const { return context; } - const ZooKeeperRetriesInfo & getZooKeeperRetriesInfo() const { return global_zookeeper_retries_info; } + const ZooKeeperRetriesInfo & getZooKeeperRetriesInfo() const { return zookeeper_retries_info; } /// Returns all access entities which can be put into a backup. std::unordered_map getAllAccessEntities(); @@ -129,7 +129,7 @@ private: LoggerPtr log; /// Unfortunately we can use ZooKeeper for collecting information for backup /// and we need to retry... - ZooKeeperRetriesInfo global_zookeeper_retries_info; + ZooKeeperRetriesInfo zookeeper_retries_info; Strings all_hosts; DDLRenamingMap renaming_map; diff --git a/src/Backups/RestoreCoordinationOnCluster.cpp b/src/Backups/RestoreCoordinationOnCluster.cpp index fad7341c044..c85cca45fa7 100644 --- a/src/Backups/RestoreCoordinationOnCluster.cpp +++ b/src/Backups/RestoreCoordinationOnCluster.cpp @@ -33,6 +33,7 @@ RestoreCoordinationOnCluster::RestoreCoordinationOnCluster( , all_hosts_without_initiator(BackupCoordinationOnCluster::excludeInitiator(all_hosts)) , current_host(current_host_) , current_host_index(BackupCoordinationOnCluster::findCurrentHostIndex(current_host, all_hosts)) + , process_list_element(process_list_element_) , log(getLogger("RestoreCoordinationOnCluster")) , with_retries(log, get_zookeeper_, keeper_settings, process_list_element_, [root_zookeeper_path_](Coordination::ZooKeeperWithFaultInjection::Ptr zk) { zk->sync(root_zookeeper_path_); }) , cleaner(/* is_restore = */ true, zookeeper_path, with_retries, log) @@ -122,7 +123,8 @@ ZooKeeperRetriesInfo RestoreCoordinationOnCluster::getOnClusterInitializationKee { return ZooKeeperRetriesInfo{keeper_settings.max_retries_while_initializing, static_cast(keeper_settings.retry_initial_backoff_ms.count()), - static_cast(keeper_settings.retry_max_backoff_ms.count())}; + static_cast(keeper_settings.retry_max_backoff_ms.count()), + process_list_element}; } bool RestoreCoordinationOnCluster::acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name) diff --git a/src/Backups/RestoreCoordinationOnCluster.h b/src/Backups/RestoreCoordinationOnCluster.h index 99929cbdac3..890117e63e8 100644 --- a/src/Backups/RestoreCoordinationOnCluster.h +++ b/src/Backups/RestoreCoordinationOnCluster.h @@ -75,7 +75,8 @@ private: const Strings all_hosts_without_initiator; const String current_host; const size_t current_host_index; - LoggerPtr const log; + const QueryStatusPtr process_list_element; + const LoggerPtr log; /// The order is important: `stage_sync` must be initialized after `with_retries` and `cleaner`. const WithRetries with_retries; diff --git a/src/Backups/RestorerFromBackup.cpp b/src/Backups/RestorerFromBackup.cpp index 9b3b2408706..249e4353296 100644 --- a/src/Backups/RestorerFromBackup.cpp +++ b/src/Backups/RestorerFromBackup.cpp @@ -109,7 +109,8 @@ RestorerFromBackup::RestorerFromBackup( , zookeeper_retries_info( context->getSettingsRef()[Setting::backup_restore_keeper_max_retries], context->getSettingsRef()[Setting::backup_restore_keeper_retry_initial_backoff_ms], - context->getSettingsRef()[Setting::backup_restore_keeper_retry_max_backoff_ms]) + context->getSettingsRef()[Setting::backup_restore_keeper_retry_max_backoff_ms], + context->getProcessListElementSafe()) , tables_dependencies("RestorerFromBackup") , thread_pool(thread_pool_) { diff --git a/src/Backups/WithRetries.cpp b/src/Backups/WithRetries.cpp index 9c18be3ca9e..53613b2149e 100644 --- a/src/Backups/WithRetries.cpp +++ b/src/Backups/WithRetries.cpp @@ -20,9 +20,10 @@ WithRetries::RetriesControlHolder::RetriesControlHolder(const WithRetries * pare : (kind == kErrorHandling) ? parent->settings.max_retries_while_handling_error : parent->settings.max_retries, parent->settings.retry_initial_backoff_ms.count(), - parent->settings.retry_max_backoff_ms.count()) + parent->settings.retry_max_backoff_ms.count(), + (kind == kErrorHandling) ? nullptr : parent->process_list_element) /// We don't use process_list_element while handling an error because the error handling can't be cancellable. - , retries_ctl(name, parent->log, info, (kind == kErrorHandling) ? nullptr : parent->process_list_element) + , retries_ctl(name, parent->log, info) , faulty_zookeeper(parent->getFaultyZooKeeper()) {} diff --git a/src/Common/ZooKeeper/ZooKeeperRetries.h b/src/Common/ZooKeeper/ZooKeeperRetries.h index acea521a7ce..9482f72cba0 100644 --- a/src/Common/ZooKeeper/ZooKeeperRetries.h +++ b/src/Common/ZooKeeper/ZooKeeperRetries.h @@ -16,21 +16,25 @@ namespace ErrorCodes struct ZooKeeperRetriesInfo { ZooKeeperRetriesInfo() = default; - ZooKeeperRetriesInfo(UInt64 max_retries_, UInt64 initial_backoff_ms_, UInt64 max_backoff_ms_) + + ZooKeeperRetriesInfo(UInt64 max_retries_, UInt64 initial_backoff_ms_, UInt64 max_backoff_ms_, QueryStatusPtr query_status_) : max_retries(max_retries_), initial_backoff_ms(std::min(initial_backoff_ms_, max_backoff_ms_)), max_backoff_ms(max_backoff_ms_) + , query_status(query_status_) { } UInt64 max_retries = 0; /// "max_retries = 0" means only one attempt. - UInt64 initial_backoff_ms = 100; - UInt64 max_backoff_ms = 5000; + UInt64 initial_backoff_ms = 0; + UInt64 max_backoff_ms = 0; + + QueryStatusPtr query_status; /// can be nullptr }; class ZooKeeperRetriesControl { public: - ZooKeeperRetriesControl(std::string name_, LoggerPtr logger_, ZooKeeperRetriesInfo retries_info_, QueryStatusPtr elem) - : name(std::move(name_)), logger(logger_), retries_info(retries_info_), process_list_element(elem) + ZooKeeperRetriesControl(std::string name_, LoggerPtr logger_, ZooKeeperRetriesInfo retries_info_) + : name(std::move(name_)), logger(logger_), retries_info(retries_info_) { } @@ -39,7 +43,6 @@ public: , logger(other.logger) , retries_info(other.retries_info) , total_failures(other.total_failures) - , process_list_element(other.process_list_element) , current_backoff_ms(other.current_backoff_ms) { } @@ -222,8 +225,8 @@ private: } /// Check if the query was cancelled. - if (process_list_element) - process_list_element->checkTimeLimit(); + if (retries_info.query_status) + retries_info.query_status->checkTimeLimit(); /// retries logLastError("will retry due to error"); @@ -231,8 +234,8 @@ private: current_backoff_ms = std::min(current_backoff_ms * 2, retries_info.max_backoff_ms); /// Check if the query was cancelled again after sleeping. - if (process_list_element) - process_list_element->checkTimeLimit(); + if (retries_info.query_status) + retries_info.query_status->checkTimeLimit(); return true; } @@ -288,7 +291,6 @@ private: std::function action_after_last_failed_retry = []() {}; bool iteration_succeeded = true; bool stop_retries = false; - QueryStatusPtr process_list_element; UInt64 current_iteration = 0; UInt64 current_backoff_ms = 0; diff --git a/src/Databases/DatabaseOrdinary.cpp b/src/Databases/DatabaseOrdinary.cpp index eed29c0d821..6c9f5b82116 100644 --- a/src/Databases/DatabaseOrdinary.cpp +++ b/src/Databases/DatabaseOrdinary.cpp @@ -408,7 +408,7 @@ void DatabaseOrdinary::restoreMetadataAfterConvertingToReplicated(StoragePtr tab } else { - rmt->restoreMetadataInZooKeeper(/* query_status = */ nullptr, /* zookeeper_retries_info = */ {}); + rmt->restoreMetadataInZooKeeper(/* zookeeper_retries_info = */ {}); LOG_INFO ( log, diff --git a/src/Databases/DatabaseReplicatedWorker.cpp b/src/Databases/DatabaseReplicatedWorker.cpp index 6a711c92332..0000096c1c1 100644 --- a/src/Databases/DatabaseReplicatedWorker.cpp +++ b/src/Databases/DatabaseReplicatedWorker.cpp @@ -199,7 +199,7 @@ void DatabaseReplicatedDDLWorker::initializeReplication() active_node_holder = zkutil::EphemeralNodeHolder::existing(active_path, *active_node_holder_zookeeper); } -String DatabaseReplicatedDDLWorker::enqueueQuery(DDLLogEntry & entry, const ZooKeeperRetriesInfo &, QueryStatusPtr) +String DatabaseReplicatedDDLWorker::enqueueQuery(DDLLogEntry & entry, const ZooKeeperRetriesInfo &) { auto zookeeper = getAndSetZooKeeper(); return enqueueQueryImpl(zookeeper, entry, database); diff --git a/src/Databases/DatabaseReplicatedWorker.h b/src/Databases/DatabaseReplicatedWorker.h index d2385cbdba3..2bb603354ac 100644 --- a/src/Databases/DatabaseReplicatedWorker.h +++ b/src/Databases/DatabaseReplicatedWorker.h @@ -24,7 +24,7 @@ class DatabaseReplicatedDDLWorker : public DDLWorker public: DatabaseReplicatedDDLWorker(DatabaseReplicated * db, ContextPtr context_); - String enqueueQuery(DDLLogEntry & entry, const ZooKeeperRetriesInfo &, QueryStatusPtr) override; + String enqueueQuery(DDLLogEntry & entry, const ZooKeeperRetriesInfo &) override; String tryEnqueueAndExecuteEntry(DDLLogEntry & entry, ContextPtr query_context); diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index b1c7635b62b..3b4d31996eb 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -1054,12 +1054,12 @@ void DDLWorker::createStatusDirs(const std::string & node_path, const ZooKeeperP } -String DDLWorker::enqueueQuery(DDLLogEntry & entry, const ZooKeeperRetriesInfo & retries_info, QueryStatusPtr process_list_element) +String DDLWorker::enqueueQuery(DDLLogEntry & entry, const ZooKeeperRetriesInfo & retries_info) { String node_path; if (retries_info.max_retries > 0) { - ZooKeeperRetriesControl retries_ctl{"DDLWorker::enqueueQuery", log, retries_info, process_list_element}; + ZooKeeperRetriesControl retries_ctl{"DDLWorker::enqueueQuery", log, retries_info}; retries_ctl.retryLoop([&]{ node_path = enqueueQueryAttempt(entry); }); diff --git a/src/Interpreters/DDLWorker.h b/src/Interpreters/DDLWorker.h index a5f47a51bb3..ec697046d03 100644 --- a/src/Interpreters/DDLWorker.h +++ b/src/Interpreters/DDLWorker.h @@ -68,7 +68,7 @@ public: virtual ~DDLWorker(); /// Pushes query into DDL queue, returns path to created node - virtual String enqueueQuery(DDLLogEntry & entry, const ZooKeeperRetriesInfo & retries_info, QueryStatusPtr process_list_element); + virtual String enqueueQuery(DDLLogEntry & entry, const ZooKeeperRetriesInfo & retries_info); /// Host ID (name:port) for logging purposes /// Note that in each task hosts are identified individually by name:port from initiator server cluster config diff --git a/src/Interpreters/DistributedQueryStatusSource.cpp b/src/Interpreters/DistributedQueryStatusSource.cpp index 83701d41c57..aaddb1bd4e2 100644 --- a/src/Interpreters/DistributedQueryStatusSource.cpp +++ b/src/Interpreters/DistributedQueryStatusSource.cpp @@ -133,8 +133,7 @@ ExecutionStatus DistributedQueryStatusSource::getExecutionStatus(const fs::path String status_data; bool finished_exists = false; - auto retries_ctl = ZooKeeperRetriesControl( - "executeDDLQueryOnCluster", getLogger("DDLQueryStatusSource"), getRetriesInfo(), context->getProcessListElement()); + auto retries_ctl = ZooKeeperRetriesControl("executeDDLQueryOnCluster", getLogger("DDLQueryStatusSource"), getRetriesInfo()); retries_ctl.retryLoop([&]() { finished_exists = context->getZooKeeper()->tryGet(status_path, status_data); }); if (finished_exists) status.tryDeserializeText(status_data); @@ -142,13 +141,14 @@ ExecutionStatus DistributedQueryStatusSource::getExecutionStatus(const fs::path return status; } -ZooKeeperRetriesInfo DistributedQueryStatusSource::getRetriesInfo() +ZooKeeperRetriesInfo DistributedQueryStatusSource::getRetriesInfo() const { const auto & config_ref = Context::getGlobalContextInstance()->getConfigRef(); return ZooKeeperRetriesInfo( config_ref.getInt("distributed_ddl_keeper_max_retries", 5), config_ref.getInt("distributed_ddl_keeper_initial_backoff_ms", 100), - config_ref.getInt("distributed_ddl_keeper_max_backoff_ms", 5000)); + config_ref.getInt("distributed_ddl_keeper_max_backoff_ms", 5000), + context->getProcessListElement()); } std::pair DistributedQueryStatusSource::parseHostAndPort(const String & host_id) @@ -194,8 +194,7 @@ Chunk DistributedQueryStatusSource::generate() Strings tmp_active_hosts; { - auto retries_ctl = ZooKeeperRetriesControl( - "executeDistributedQueryOnCluster", getLogger(getName()), getRetriesInfo(), context->getProcessListElement()); + auto retries_ctl = ZooKeeperRetriesControl("executeDistributedQueryOnCluster", getLogger(getName()), getRetriesInfo()); retries_ctl.retryLoop( [&]() { diff --git a/src/Interpreters/DistributedQueryStatusSource.h b/src/Interpreters/DistributedQueryStatusSource.h index 4f58085a1f0..71315c5cd74 100644 --- a/src/Interpreters/DistributedQueryStatusSource.h +++ b/src/Interpreters/DistributedQueryStatusSource.h @@ -38,7 +38,7 @@ protected: Strings getNewAndUpdate(const Strings & current_finished_hosts); ExecutionStatus getExecutionStatus(const fs::path & status_path); - static ZooKeeperRetriesInfo getRetriesInfo(); + ZooKeeperRetriesInfo getRetriesInfo() const; static std::pair parseHostAndPort(const String & host_id); String node_path; diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index ca96ee3245f..ac5f03e3b24 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -884,10 +884,10 @@ void InterpreterSystemQuery::restoreReplica() const auto & settings = getContext()->getSettingsRef(); table_replicated_ptr->restoreMetadataInZooKeeper( - getContext()->getProcessListElementSafe(), ZooKeeperRetriesInfo{settings[Setting::keeper_max_retries], settings[Setting::keeper_retry_initial_backoff_ms], - settings[Setting::keeper_retry_max_backoff_ms]}); + settings[Setting::keeper_retry_max_backoff_ms], + getContext()->getProcessListElementSafe()}); } StoragePtr InterpreterSystemQuery::tryRestartReplica(const StorageID & replica, ContextMutablePtr system_context) diff --git a/src/Interpreters/executeDDLQueryOnCluster.cpp b/src/Interpreters/executeDDLQueryOnCluster.cpp index 0b88d07148c..43ffa946a57 100644 --- a/src/Interpreters/executeDDLQueryOnCluster.cpp +++ b/src/Interpreters/executeDDLQueryOnCluster.cpp @@ -189,7 +189,7 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, ContextPtr context, entry.setSettingsIfRequired(context); entry.tracing_context = OpenTelemetry::CurrentContext(); entry.initial_query_id = context->getClientInfo().initial_query_id; - String node_path = ddl_worker.enqueueQuery(entry, params.retries_info, context->getProcessListElement()); + String node_path = ddl_worker.enqueueQuery(entry, params.retries_info); return getDDLOnClusterStatus(node_path, ddl_worker.getReplicasDir(), entry, context); } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp index c654b459c24..db5b1d5d0c9 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp @@ -167,23 +167,23 @@ void ReplicatedMergeTreeAttachThread::runImpl() zookeeper->tryRemove(replica_path + "/flags/force_restore_data"); /// Here `zookeeper_retries_info = {}` because the attach thread has its own retries (see ReplicatedMergeTreeAttachThread::run()). - storage.checkTableStructure(replica_path, metadata_snapshot, /* metadata_version = */ nullptr, /* strict_check = */ true, /* zookeeper_retries_info = */ {}, /* process_list_element = */ nullptr); + storage.checkTableStructure(replica_path, metadata_snapshot, /* metadata_version = */ nullptr, /* strict_check = */ true, /* zookeeper_retries_info = */ {}); storage.checkParts(skip_sanity_checks); /// Temporary directories contain uninitialized results of Merges or Fetches (after forced restart), /// don't allow to reinitialize them, delete each of them immediately. storage.clearOldTemporaryDirectories(0, {"tmp_", "delete_tmp_", "tmp-fetch_"}); - storage.createNewZooKeeperNodes(/* zookeeper_retries_info = */ {}, /* process_list_element = */ nullptr); - storage.syncPinnedPartUUIDs(/* zookeeper_retries_info = */ {}, /* process_list_element = */ nullptr); + storage.createNewZooKeeperNodes(/* zookeeper_retries_info = */ {}); + storage.syncPinnedPartUUIDs(/* zookeeper_retries_info = */ {}); std::lock_guard lock(storage.table_shared_id_mutex); - storage.createTableSharedID(/* zookeeper_retries_info = */ {}, /* process_list_element = */ nullptr); + storage.createTableSharedID(/* zookeeper_retries_info = */ {}); }; void ReplicatedMergeTreeAttachThread::finalizeInitialization() TSA_NO_THREAD_SAFETY_ANALYSIS { - storage.startupImpl(/* from_attach_thread */ true, /* zookeeper_retries_info = */ {}, /* process_list_element = */ nullptr); + storage.startupImpl(/* from_attach_thread */ true, /* zookeeper_retries_info = */ {}); storage.initialization_done = true; LOG_INFO(log, "Table is initialized"); } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index 19a69eb46be..3422e534f7d 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -201,8 +201,8 @@ size_t ReplicatedMergeTreeSinkImpl::checkQuorumPrecondition(const log, {settings[Setting::insert_keeper_max_retries], settings[Setting::insert_keeper_retry_initial_backoff_ms], - settings[Setting::insert_keeper_retry_max_backoff_ms]}, - context->getProcessListElement()); + settings[Setting::insert_keeper_retry_max_backoff_ms], + context->getProcessListElement()}); quorum_retries_ctl.retryLoop( [&]() { @@ -725,8 +725,8 @@ std::pair, bool> ReplicatedMergeTreeSinkImpl:: log, {settings[Setting::insert_keeper_max_retries], settings[Setting::insert_keeper_retry_initial_backoff_ms], - settings[Setting::insert_keeper_retry_max_backoff_ms]}, - context->getProcessListElement()); + settings[Setting::insert_keeper_retry_max_backoff_ms], + context->getProcessListElement()}); auto resolve_duplicate_stage = [&] () -> CommitRetryContext::Stages { diff --git a/src/Storages/MergeTree/registerStorageMergeTree.cpp b/src/Storages/MergeTree/registerStorageMergeTree.cpp index 12b5d115903..709f019a005 100644 --- a/src/Storages/MergeTree/registerStorageMergeTree.cpp +++ b/src/Storages/MergeTree/registerStorageMergeTree.cpp @@ -835,11 +835,11 @@ static StoragePtr create(const StorageFactory::Arguments & args) if (auto txn = args.getLocalContext()->getZooKeeperMetadataTransaction()) need_check_table_structure = txn->isInitialQuery(); - ZooKeeperRetriesInfo create_query_zk_retries_info; - create_query_zk_retries_info.max_retries = local_settings[Setting::keeper_max_retries]; - create_query_zk_retries_info.initial_backoff_ms = local_settings[Setting::keeper_retry_initial_backoff_ms]; - create_query_zk_retries_info.max_backoff_ms = local_settings[Setting::keeper_retry_max_backoff_ms]; - auto create_query_status = args.getLocalContext()->getProcessListElementSafe(); + ZooKeeperRetriesInfo create_query_zk_retries_info{ + local_settings[Setting::keeper_max_retries], + local_settings[Setting::keeper_retry_initial_backoff_ms], + local_settings[Setting::keeper_retry_max_backoff_ms], + args.getLocalContext()->getProcessListElementSafe()}; return std::make_shared( zookeeper_info, @@ -852,8 +852,7 @@ static StoragePtr create(const StorageFactory::Arguments & args) merging_params, std::move(storage_settings), need_check_table_structure, - create_query_zk_retries_info, - create_query_status); + create_query_zk_retries_info); } return std::make_shared( diff --git a/src/Storages/StorageKeeperMap.cpp b/src/Storages/StorageKeeperMap.cpp index 2a4a5f3370f..1504656bce5 100644 --- a/src/Storages/StorageKeeperMap.cpp +++ b/src/Storages/StorageKeeperMap.cpp @@ -189,8 +189,8 @@ public: ZooKeeperRetriesInfo{ settings[Setting::insert_keeper_max_retries], settings[Setting::insert_keeper_retry_initial_backoff_ms], - settings[Setting::insert_keeper_retry_max_backoff_ms]}, - context->getProcessListElement()}; + settings[Setting::insert_keeper_retry_max_backoff_ms], + context->getProcessListElement()}}; zk_retry.retryLoop([&]() { @@ -425,8 +425,10 @@ StorageKeeperMap::StorageKeeperMap( getName(), getLogger(getName()), ZooKeeperRetriesInfo{ - settings[Setting::keeper_max_retries], settings[Setting::keeper_retry_initial_backoff_ms], settings[Setting::keeper_retry_max_backoff_ms]}, - context_->getProcessListElement()}; + settings[Setting::keeper_max_retries], + settings[Setting::keeper_retry_initial_backoff_ms], + settings[Setting::keeper_retry_max_backoff_ms], + context_->getProcessListElement()}}; zk_retry.retryLoop( [&] @@ -670,8 +672,10 @@ Pipe StorageKeeperMap::read( getName(), getLogger(getName()), ZooKeeperRetriesInfo{ - settings[Setting::keeper_max_retries], settings[Setting::keeper_retry_initial_backoff_ms], settings[Setting::keeper_retry_max_backoff_ms]}, - context_->getProcessListElement()}; + settings[Setting::keeper_max_retries], + settings[Setting::keeper_retry_initial_backoff_ms], + settings[Setting::keeper_retry_max_backoff_ms], + context_->getProcessListElement()}}; std::vector children; zk_retry.retryLoop([&] @@ -699,8 +703,10 @@ void StorageKeeperMap::truncate(const ASTPtr &, const StorageMetadataPtr &, Cont getName(), getLogger(getName()), ZooKeeperRetriesInfo{ - settings[Setting::keeper_max_retries], settings[Setting::keeper_retry_initial_backoff_ms], settings[Setting::keeper_retry_max_backoff_ms]}, - local_context->getProcessListElement()}; + settings[Setting::keeper_max_retries], + settings[Setting::keeper_retry_initial_backoff_ms], + settings[Setting::keeper_retry_max_backoff_ms], + local_context->getProcessListElement()}}; zk_retry.retryLoop([&] { @@ -1136,8 +1142,10 @@ StorageKeeperMap::TableStatus StorageKeeperMap::getTableStatus(const ContextPtr getName(), getLogger(getName()), ZooKeeperRetriesInfo{ - settings[Setting::keeper_max_retries], settings[Setting::keeper_retry_initial_backoff_ms], settings[Setting::keeper_retry_max_backoff_ms]}, - local_context->getProcessListElement()}; + settings[Setting::keeper_max_retries], + settings[Setting::keeper_retry_initial_backoff_ms], + settings[Setting::keeper_retry_max_backoff_ms], + local_context->getProcessListElement()}}; zk_retry.retryLoop([&] { @@ -1248,8 +1256,10 @@ Chunk StorageKeeperMap::getBySerializedKeys( getName(), getLogger(getName()), ZooKeeperRetriesInfo{ - settings[Setting::keeper_max_retries], settings[Setting::keeper_retry_initial_backoff_ms], settings[Setting::keeper_retry_max_backoff_ms]}, - local_context->getProcessListElement()}; + settings[Setting::keeper_max_retries], + settings[Setting::keeper_retry_initial_backoff_ms], + settings[Setting::keeper_retry_max_backoff_ms], + local_context->getProcessListElement()}}; zkutil::ZooKeeper::MultiTryGetResponse values; zk_retry.retryLoop([&]{ @@ -1394,8 +1404,10 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca getName(), getLogger(getName()), ZooKeeperRetriesInfo{ - settings[Setting::keeper_max_retries], settings[Setting::keeper_retry_initial_backoff_ms], settings[Setting::keeper_retry_max_backoff_ms]}, - local_context->getProcessListElement()}; + settings[Setting::keeper_max_retries], + settings[Setting::keeper_retry_initial_backoff_ms], + settings[Setting::keeper_retry_max_backoff_ms], + local_context->getProcessListElement()}}; Coordination::Error status; zk_retry.retryLoop([&] diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index ac1b37ecbdb..b57dcd144f2 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -367,8 +367,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( const MergingParams & merging_params_, std::unique_ptr settings_, bool need_check_structure, - const ZooKeeperRetriesInfo & create_query_zookeeper_retries_info_, - QueryStatusPtr create_query_status_) + const ZooKeeperRetriesInfo & create_query_zookeeper_retries_info_) : MergeTreeData(table_id_, metadata_, context_, @@ -383,7 +382,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( , replica_name(zookeeper_info.replica_name) , replica_path(fs::path(zookeeper_path) / "replicas" / replica_name) , create_query_zookeeper_retries_info(create_query_zookeeper_retries_info_) - , create_query_status(create_query_status_) , reader(*this) , writer(*this) , merger_mutator(*this) @@ -573,7 +571,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( try { - bool is_first_replica = createTableIfNotExists(metadata_snapshot, getCreateQueryZooKeeperRetriesInfo(), getCreateQueryStatus()); + bool is_first_replica = createTableIfNotExists(metadata_snapshot, getCreateQueryZooKeeperRetriesInfo()); try { @@ -582,13 +580,13 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( /// We have to check granularity on other replicas. If it's fixed we /// must create our new replica with fixed granularity and store this /// information in /replica/metadata. - other_replicas_fixed_granularity = checkFixedGranularityInZookeeper(getCreateQueryZooKeeperRetriesInfo(), getCreateQueryStatus()); + other_replicas_fixed_granularity = checkFixedGranularityInZookeeper(getCreateQueryZooKeeperRetriesInfo()); /// Allow structure mismatch for secondary queries from Replicated database. /// It may happen if the table was altered just after creation. /// Metadata will be updated in cloneMetadataIfNeeded(...), metadata_version will be 0 for a while. int32_t metadata_version; - bool same_structure = checkTableStructure(zookeeper_path, metadata_snapshot, &metadata_version, need_check_structure, getCreateQueryZooKeeperRetriesInfo(), getCreateQueryStatus()); + bool same_structure = checkTableStructure(zookeeper_path, metadata_snapshot, &metadata_version, need_check_structure, getCreateQueryZooKeeperRetriesInfo()); if (same_structure) { @@ -609,13 +607,13 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( } if (!is_first_replica) - createReplica(metadata_snapshot, getCreateQueryZooKeeperRetriesInfo(), getCreateQueryStatus()); + createReplica(metadata_snapshot, getCreateQueryZooKeeperRetriesInfo()); - createNewZooKeeperNodes(getCreateQueryZooKeeperRetriesInfo(), getCreateQueryStatus()); - syncPinnedPartUUIDs(getCreateQueryZooKeeperRetriesInfo(), getCreateQueryStatus()); + createNewZooKeeperNodes(getCreateQueryZooKeeperRetriesInfo()); + syncPinnedPartUUIDs(getCreateQueryZooKeeperRetriesInfo()); if (!has_metadata_in_zookeeper.has_value() || *has_metadata_in_zookeeper) - createTableSharedID(getCreateQueryZooKeeperRetriesInfo(), getCreateQueryStatus()); + createTableSharedID(getCreateQueryZooKeeperRetriesInfo()); } catch (...) { @@ -628,7 +626,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( } -bool StorageReplicatedMergeTree::checkFixedGranularityInZookeeper(const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element) const +bool StorageReplicatedMergeTree::checkFixedGranularityInZookeeper(const ZooKeeperRetriesInfo & zookeeper_retries_info) const { bool fixed_granularity = false; @@ -642,7 +640,7 @@ bool StorageReplicatedMergeTree::checkFixedGranularityInZookeeper(const ZooKeepe if (zookeeper_retries_info.max_retries > 0) { - ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::checkFixedGranularityInZookeeper", log.load(), zookeeper_retries_info, process_list_element}; + ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::checkFixedGranularityInZookeeper", log.load(), zookeeper_retries_info}; retries_ctl.retryLoop([&] { check_fixed_granularity(); }); } else @@ -825,11 +823,11 @@ std::vector getAncestors(const String & path) } -void StorageReplicatedMergeTree::createNewZooKeeperNodes(const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element) const +void StorageReplicatedMergeTree::createNewZooKeeperNodes(const ZooKeeperRetriesInfo & zookeeper_retries_info) const { if (zookeeper_retries_info.max_retries > 0) { - ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::createNewZooKeeperNodes", log.load(), zookeeper_retries_info, process_list_element}; + ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::createNewZooKeeperNodes", log.load(), zookeeper_retries_info}; retries_ctl.retryLoop([&] { createNewZooKeeperNodesAttempt(); }); } else @@ -903,19 +901,17 @@ void StorageReplicatedMergeTree::createNewZooKeeperNodesAttempt() const } } -bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot, - const ZooKeeperRetriesInfo & zookeeper_retries_info, - QueryStatusPtr process_list_element) const +bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot, const ZooKeeperRetriesInfo & zookeeper_retries_info) const { bool table_created = false; if (zookeeper_retries_info.max_retries > 0) { - ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::createTableIfNotExists", log.load(), zookeeper_retries_info, process_list_element}; - retries_ctl.retryLoop([&] { table_created = createTableIfNotExistsAttempt(metadata_snapshot, process_list_element); }); + ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::createTableIfNotExists", log.load(), zookeeper_retries_info}; + retries_ctl.retryLoop([&] { table_created = createTableIfNotExistsAttempt(metadata_snapshot, zookeeper_retries_info.query_status); }); } else { - table_created = createTableIfNotExistsAttempt(metadata_snapshot, process_list_element); + table_created = createTableIfNotExistsAttempt(metadata_snapshot, zookeeper_retries_info.query_status); } return table_created; } @@ -1069,18 +1065,16 @@ bool StorageReplicatedMergeTree::createTableIfNotExistsAttempt(const StorageMeta "of wrong zookeeper_path or because of logical error"); } -void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metadata_snapshot, - const ZooKeeperRetriesInfo & zookeeper_retries_info, - QueryStatusPtr process_list_element) const +void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metadata_snapshot, const ZooKeeperRetriesInfo & zookeeper_retries_info) const { if (zookeeper_retries_info.max_retries > 0) { - ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::createReplica", log.load(), zookeeper_retries_info, process_list_element}; - retries_ctl.retryLoop([&] { createReplicaAttempt(metadata_snapshot, process_list_element); }); + ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::createReplica", log.load(), zookeeper_retries_info}; + retries_ctl.retryLoop([&] { createReplicaAttempt(metadata_snapshot, zookeeper_retries_info.query_status); }); } else { - createReplicaAttempt(metadata_snapshot, process_list_element); + createReplicaAttempt(metadata_snapshot, zookeeper_retries_info.query_status); } } @@ -1244,17 +1238,10 @@ ZooKeeperRetriesInfo StorageReplicatedMergeTree::getCreateQueryZooKeeperRetriesI return create_query_zookeeper_retries_info; } -QueryStatusPtr StorageReplicatedMergeTree::getCreateQueryStatus() const -{ - std::lock_guard lock{create_query_zookeeper_retries_info_mutex}; - return create_query_status; -} - void StorageReplicatedMergeTree::clearCreateQueryZooKeeperRetriesInfo() { std::lock_guard lock{create_query_zookeeper_retries_info_mutex}; create_query_zookeeper_retries_info = {}; - create_query_status = {}; } @@ -1620,12 +1607,12 @@ bool StorageReplicatedMergeTree::removeTableNodesFromZooKeeper(zkutil::ZooKeeper */ bool StorageReplicatedMergeTree::checkTableStructure( const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot, int32_t * metadata_version, bool strict_check, - const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element) const + const ZooKeeperRetriesInfo & zookeeper_retries_info) const { bool same_structure = false; if (zookeeper_retries_info.max_retries > 0) { - ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::checkTableStructure", log.load(), zookeeper_retries_info, process_list_element}; + ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::checkTableStructure", log.load(), zookeeper_retries_info}; retries_ctl.retryLoop([&] { same_structure = checkTableStructureAttempt(zookeeper_prefix, metadata_snapshot, metadata_version, strict_check); }); } else @@ -1970,7 +1957,7 @@ bool StorageReplicatedMergeTree::checkPartsImpl(bool skip_sanity_checks) } -void StorageReplicatedMergeTree::syncPinnedPartUUIDs(const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element) +void StorageReplicatedMergeTree::syncPinnedPartUUIDs(const ZooKeeperRetriesInfo & zookeeper_retries_info) { String new_pinned_part_uuids_str; Coordination::Stat new_stat; @@ -1983,7 +1970,7 @@ void StorageReplicatedMergeTree::syncPinnedPartUUIDs(const ZooKeeperRetriesInfo if (zookeeper_retries_info.max_retries > 0) { - ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::syncPinnedPartUUIDs", log.load(), zookeeper_retries_info, process_list_element}; + ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::syncPinnedPartUUIDs", log.load(), zookeeper_retries_info}; retries_ctl.retryLoop([&] { read_pinned_part_uuids(); }); } else @@ -2352,7 +2339,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry) case LogEntry::ALTER_METADATA: return executeMetadataAlter(entry); case LogEntry::SYNC_PINNED_PART_UUIDS: - syncPinnedPartUUIDs(/* zookeeper_retries_info = */ {}, /* process_list_element = */ nullptr); + syncPinnedPartUUIDs(/* zookeeper_retries_info = */ {}); return true; case LogEntry::CLONE_PART_FROM_SHARD: executeClonePartFromShard(entry); @@ -4501,7 +4488,7 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n } -void StorageReplicatedMergeTree::startBeingLeader(const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element) +void StorageReplicatedMergeTree::startBeingLeader(const ZooKeeperRetriesInfo & zookeeper_retries_info) { if (!(*getSettings())[MergeTreeSetting::replicated_can_become_leader]) { @@ -4517,7 +4504,7 @@ void StorageReplicatedMergeTree::startBeingLeader(const ZooKeeperRetriesInfo & z if (zookeeper_retries_info.max_retries > 0) { - ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::startBeingLeader", log.load(), zookeeper_retries_info, process_list_element}; + ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::startBeingLeader", log.load(), zookeeper_retries_info}; retries_ctl.retryLoop([&] { start_being_leader(); }); } else @@ -5396,10 +5383,10 @@ void StorageReplicatedMergeTree::startup() return; } - startupImpl(/* from_attach_thread */ false, getCreateQueryZooKeeperRetriesInfo(), getCreateQueryStatus()); + startupImpl(/* from_attach_thread */ false, getCreateQueryZooKeeperRetriesInfo()); } -void StorageReplicatedMergeTree::startupImpl(bool from_attach_thread, const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element) +void StorageReplicatedMergeTree::startupImpl(bool from_attach_thread, const ZooKeeperRetriesInfo & zookeeper_retries_info) { /// Do not start replication if ZooKeeper is not configured or there is no metadata in zookeeper if (!has_metadata_in_zookeeper.has_value() || !*has_metadata_in_zookeeper) @@ -5428,7 +5415,7 @@ void StorageReplicatedMergeTree::startupImpl(bool from_attach_thread, const ZooK getContext()->getInterserverIOHandler().addEndpoint( data_parts_exchange_ptr->getId(getEndpointName()), data_parts_exchange_ptr); - startBeingLeader(zookeeper_retries_info, process_list_element); + startBeingLeader(zookeeper_retries_info); if (from_attach_thread) { @@ -6683,7 +6670,7 @@ bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition( return true; } -void StorageReplicatedMergeTree::restoreMetadataInZooKeeper(QueryStatusPtr query_status, const ZooKeeperRetriesInfo & zookeeper_retries_info) +void StorageReplicatedMergeTree::restoreMetadataInZooKeeper(const ZooKeeperRetriesInfo & zookeeper_retries_info) { LOG_INFO(log, "Restoring replica metadata"); @@ -6726,14 +6713,14 @@ void StorageReplicatedMergeTree::restoreMetadataInZooKeeper(QueryStatusPtr query LOG_INFO(log, "Moved all parts to detached/"); - const bool is_first_replica = createTableIfNotExists(metadata_snapshot, zookeeper_retries_info, query_status); + const bool is_first_replica = createTableIfNotExists(metadata_snapshot, zookeeper_retries_info); LOG_INFO(log, "Created initial ZK nodes, replica is first: {}", is_first_replica); if (!is_first_replica) - createReplica(metadata_snapshot, zookeeper_retries_info, query_status); + createReplica(metadata_snapshot, zookeeper_retries_info); - createNewZooKeeperNodes(zookeeper_retries_info, query_status); + createNewZooKeeperNodes(zookeeper_retries_info); LOG_INFO(log, "Created ZK nodes for table"); @@ -6745,7 +6732,7 @@ void StorageReplicatedMergeTree::restoreMetadataInZooKeeper(QueryStatusPtr query LOG_INFO(log, "Attached all partitions, starting table"); - startupImpl(/* from_attach_thread */ false, zookeeper_retries_info, query_status); + startupImpl(/* from_attach_thread */ false, zookeeper_retries_info); } void StorageReplicatedMergeTree::dropPartNoWaitNoThrow(const String & part_name) @@ -8034,8 +8021,8 @@ void StorageReplicatedMergeTree::forcefullyRemoveBrokenOutdatedPartFromZooKeeper String part_path = replica_path + "/parts/" + part_name; const auto & settings = getContext()->getSettingsRef(); ZooKeeperRetriesInfo retries_info{ - settings[Setting::keeper_max_retries], settings[Setting::keeper_retry_initial_backoff_ms], settings[Setting::keeper_retry_max_backoff_ms]}; - ZooKeeperRetriesControl retries_ctl("outdatedPartExists", log.load(), retries_info, nullptr); + settings[Setting::keeper_max_retries], settings[Setting::keeper_retry_initial_backoff_ms], settings[Setting::keeper_retry_max_backoff_ms], nullptr}; + ZooKeeperRetriesControl retries_ctl("outdatedPartExists", log.load(), retries_info); retries_ctl.retryLoop([&]() { exists = getZooKeeper()->exists(part_path); }); if (!exists) @@ -8924,7 +8911,7 @@ void StorageReplicatedMergeTree::movePartitionToShard( { /// Optimistic check that for compatible destination table structure. - checkTableStructure(to, getInMemoryMetadataPtr(), /* metadata_version = */ nullptr, /* strict_check = */ true, /* zookeeper_retries_info = */ {}, /* process_list_element = */ nullptr); + checkTableStructure(to, getInMemoryMetadataPtr(), /* metadata_version = */ nullptr, /* strict_check = */ true, /* zookeeper_retries_info = */ {}); } PinnedPartUUIDs src_pins; @@ -9527,7 +9514,7 @@ String StorageReplicatedMergeTree::getTableSharedID() const { /// Can happen if table was partially initialized before drop by DatabaseCatalog if (table_shared_id == UUIDHelpers::Nil) - createTableSharedID(/* zookeeper_retries_info = */ {}, /* process_list_element = */ nullptr); + createTableSharedID(/* zookeeper_retries_info = */ {}); } else { @@ -9542,11 +9529,11 @@ std::map StorageReplicatedMergeTree::getUnfinishe return queue.getUnfinishedMutations(); } -void StorageReplicatedMergeTree::createTableSharedID(const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element) const +void StorageReplicatedMergeTree::createTableSharedID(const ZooKeeperRetriesInfo & zookeeper_retries_info) const { if (zookeeper_retries_info.max_retries > 0) { - ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::createTableSharedID", log.load(), zookeeper_retries_info, process_list_element}; + ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::createTableSharedID", log.load(), zookeeper_retries_info}; retries_ctl.retryLoop([&] { createTableSharedIDAttempt(); }); } else @@ -10836,7 +10823,7 @@ void StorageReplicatedMergeTree::backupData( bool exists = false; Strings mutation_ids; { - ZooKeeperRetriesControl retries_ctl("getMutations", log.load(), zookeeper_retries_info, nullptr); + ZooKeeperRetriesControl retries_ctl("getMutations", log.load(), zookeeper_retries_info); retries_ctl.retryLoop([&]() { if (!zookeeper || zookeeper->expired()) @@ -10855,7 +10842,7 @@ void StorageReplicatedMergeTree::backupData( bool mutation_id_exists = false; String mutation; - ZooKeeperRetriesControl retries_ctl("getMutation", log.load(), zookeeper_retries_info, nullptr); + ZooKeeperRetriesControl retries_ctl("getMutation", log.load(), zookeeper_retries_info); retries_ctl.retryLoop([&]() { if (!zookeeper || zookeeper->expired()) diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index c6081d45bf4..f153ae91361 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -110,8 +110,7 @@ public: const MergingParams & merging_params_, std::unique_ptr settings_, bool need_check_structure, - const ZooKeeperRetriesInfo & create_query_zookeeper_retries_info_, - QueryStatusPtr create_query_status_); + const ZooKeeperRetriesInfo & create_query_zookeeper_retries_info_); void startup() override; @@ -316,7 +315,7 @@ public: /// Restores table metadata if ZooKeeper lost it. /// Used only on restarted readonly replicas (not checked). All active (Active) parts are moved to detached/ /// folder and attached. Parts in all other states are just moved to detached/ folder. - void restoreMetadataInZooKeeper(QueryStatusPtr query_status, const ZooKeeperRetriesInfo & zookeeper_retries_info); + void restoreMetadataInZooKeeper(const ZooKeeperRetriesInfo & zookeeper_retries_info); /// Get throttler for replicated fetches ThrottlerPtr getFetchesThrottler() const @@ -428,7 +427,6 @@ private: const String replica_path; ZooKeeperRetriesInfo create_query_zookeeper_retries_info TSA_GUARDED_BY(create_query_zookeeper_retries_info_mutex); - QueryStatusPtr create_query_status TSA_GUARDED_BY(create_query_zookeeper_retries_info_mutex); mutable std::mutex create_query_zookeeper_retries_info_mutex; /** /replicas/me/is_active. @@ -579,27 +577,26 @@ private: /** Creates the minimum set of nodes in ZooKeeper and create first replica. * Returns true if was created, false if exists. */ - bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot, const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element) const; + bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot, const ZooKeeperRetriesInfo & zookeeper_retries_info) const; bool createTableIfNotExistsAttempt(const StorageMetadataPtr & metadata_snapshot, QueryStatusPtr process_list_element) const; /** * Creates a replica in ZooKeeper and adds to the queue all that it takes to catch up with the rest of the replicas. */ - void createReplica(const StorageMetadataPtr & metadata_snapshot, const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element) const; + void createReplica(const StorageMetadataPtr & metadata_snapshot, const ZooKeeperRetriesInfo & zookeeper_retries_info) const; void createReplicaAttempt(const StorageMetadataPtr & metadata_snapshot, QueryStatusPtr process_list_element) const; /** Create nodes in the ZK, which must always be, but which might not exist when older versions of the server are running. */ - void createNewZooKeeperNodes(const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element) const; + void createNewZooKeeperNodes(const ZooKeeperRetriesInfo & zookeeper_retries_info) const; void createNewZooKeeperNodesAttempt() const; /// Returns the ZooKeeper retries info specified for the CREATE TABLE query which is creating and starting this table right now. ZooKeeperRetriesInfo getCreateQueryZooKeeperRetriesInfo() const; - QueryStatusPtr getCreateQueryStatus() const; void clearCreateQueryZooKeeperRetriesInfo(); bool checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot, int32_t * metadata_version, bool strict_check, - const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element) const; + const ZooKeeperRetriesInfo & zookeeper_retries_info) const; bool checkTableStructureAttempt(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot, int32_t * metadata_version, bool strict_check) const; /// A part of ALTER: apply metadata changes only (data parts are altered separately). @@ -619,7 +616,7 @@ private: /// Synchronize the list of part uuids which are currently pinned. These should be sent to root query executor /// to be used for deduplication. - void syncPinnedPartUUIDs(const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element); + void syncPinnedPartUUIDs(const ZooKeeperRetriesInfo & zookeeper_retries_info); /** Check that the part's checksum is the same as the checksum of the same part on some other replica. * If no one has such a part, nothing checks. @@ -722,7 +719,7 @@ private: /// Start being leader (if not disabled by setting). /// Since multi-leaders are allowed, it just sets is_leader flag. - void startBeingLeader(const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element); + void startBeingLeader(const ZooKeeperRetriesInfo & zookeeper_retries_info); void stopBeingLeader(); /** Selects the parts to merge and writes to the log. @@ -939,7 +936,7 @@ private: /// Check granularity of already existing replicated table in zookeeper if it exists /// return true if it's fixed - bool checkFixedGranularityInZookeeper(const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element) const; + bool checkFixedGranularityInZookeeper(const ZooKeeperRetriesInfo & zookeeper_retries_info) const; /// Wait for timeout seconds mutation is finished on replicas void waitMutationToFinishOnReplicas( @@ -977,7 +974,7 @@ private: void createAndStoreFreezeMetadata(DiskPtr disk, DataPartPtr part, String backup_part_path) const override; // Create table id if needed - void createTableSharedID(const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element) const; + void createTableSharedID(const ZooKeeperRetriesInfo & zookeeper_retries_info) const; void createTableSharedIDAttempt() const; bool checkZeroCopyLockExists(const String & part_name, const DiskPtr & disk, String & lock_replica); @@ -994,7 +991,7 @@ private: /// Or if node actually disappeared. bool waitZeroCopyLockToDisappear(const ZeroCopyLock & lock, size_t milliseconds_to_wait) override; - void startupImpl(bool from_attach_thread, const ZooKeeperRetriesInfo & zookeeper_retries_info, QueryStatusPtr process_list_element); + void startupImpl(bool from_attach_thread, const ZooKeeperRetriesInfo & zookeeper_retries_info); std::vector getZookeeperZeroCopyLockPaths() const; static void dropZookeeperZeroCopyLockPaths(zkutil::ZooKeeperPtr zookeeper, diff --git a/src/Storages/System/StorageSystemZooKeeper.cpp b/src/Storages/System/StorageSystemZooKeeper.cpp index 000098af80d..1cd17cc0cd3 100644 --- a/src/Storages/System/StorageSystemZooKeeper.cpp +++ b/src/Storages/System/StorageSystemZooKeeper.cpp @@ -518,7 +518,8 @@ Chunk SystemZooKeeperSource::generate() ZooKeeperRetriesInfo retries_seetings( settings[Setting::insert_keeper_max_retries], settings[Setting::insert_keeper_retry_initial_backoff_ms], - settings[Setting::insert_keeper_retry_max_backoff_ms]); + settings[Setting::insert_keeper_retry_max_backoff_ms], + query_status); /// Handles reconnects when needed auto get_zookeeper = [&] () @@ -586,7 +587,7 @@ Chunk SystemZooKeeperSource::generate() } zkutil::ZooKeeper::MultiTryGetChildrenResponse list_responses; - ZooKeeperRetriesControl("", nullptr, retries_seetings, query_status).retryLoop( + ZooKeeperRetriesControl("", nullptr, retries_seetings).retryLoop( [&]() { list_responses = get_zookeeper()->tryGetChildren(paths_to_list); }); struct GetTask @@ -632,7 +633,7 @@ Chunk SystemZooKeeperSource::generate() } zkutil::ZooKeeper::MultiTryGetResponse get_responses; - ZooKeeperRetriesControl("", nullptr, retries_seetings, query_status).retryLoop( + ZooKeeperRetriesControl("", nullptr, retries_seetings).retryLoop( [&]() { get_responses = get_zookeeper()->tryGet(paths_to_get); }); /// Add children count to query total rows. We can not get total rows in advance,