diff --git a/src/Common/ZooKeeper/ZooKeeper.h b/src/Common/ZooKeeper/ZooKeeper.h index fbe1bede91a..5b37e4d6024 100644 --- a/src/Common/ZooKeeper/ZooKeeper.h +++ b/src/Common/ZooKeeper/ZooKeeper.h @@ -315,7 +315,7 @@ public: return std::make_shared(path, zookeeper, false, false, ""); } - void reset() + void setAlreadyRemoved() { need_remove = false; } diff --git a/src/Databases/DatabaseAtomic.cpp b/src/Databases/DatabaseAtomic.cpp index 2065e036863..71e0effb2d2 100644 --- a/src/Databases/DatabaseAtomic.cpp +++ b/src/Databases/DatabaseAtomic.cpp @@ -115,11 +115,14 @@ void DatabaseAtomic::dropTable(const Context & context, const String & table_nam std::unique_lock lock(mutex); table = getTableUnlocked(table_name, lock); table_metadata_path_drop = DatabaseCatalog::instance().getPathForDroppedMetadata(table->getStorageID()); - auto txn = context.getMetadataTransaction(); + auto txn = context.getZooKeeperMetadataTransaction(); if (txn && !context.isInternalSubquery()) txn->commit(); /// Commit point (a sort of) for Replicated database /// NOTE: replica will be lost if server crashes before the following rename + /// We apply changes in ZooKeeper before applying changes in local metadata file + /// to reduce probability of failures between these operations + /// (it's more likely to lost connection, than to fail before applying local changes). /// TODO better detection and recovery Poco::File(table_metadata_path).renameTo(table_metadata_path_drop); /// Mark table as dropped @@ -241,7 +244,7 @@ void DatabaseAtomic::renameTable(const Context & context, const String & table_n } /// Table renaming actually begins here - auto txn = context.getMetadataTransaction(); + auto txn = context.getZooKeeperMetadataTransaction(); if (txn && !context.isInternalSubquery()) txn->commit(); /// Commit point (a sort of) for Replicated database @@ -302,7 +305,7 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora DatabaseCatalog::instance().addUUIDMapping(query.uuid); locked_uuid = true; - auto txn = query_context.getMetadataTransaction(); + auto txn = query_context.getZooKeeperMetadataTransaction(); if (txn && !query_context.isInternalSubquery()) txn->commit(); /// Commit point (a sort of) for Replicated database @@ -337,7 +340,7 @@ void DatabaseAtomic::commitAlterTable(const StorageID & table_id, const String & if (table_id.uuid != actual_table_id.uuid) throw Exception("Cannot alter table because it was renamed", ErrorCodes::CANNOT_ASSIGN_ALTER); - auto txn = query_context.getMetadataTransaction(); + auto txn = query_context.getZooKeeperMetadataTransaction(); if (txn && !query_context.isInternalSubquery()) txn->commit(); /// Commit point (a sort of) for Replicated database diff --git a/src/Databases/DatabaseFactory.cpp b/src/Databases/DatabaseFactory.cpp index ca2b9bb083e..cd0143556c9 100644 --- a/src/Databases/DatabaseFactory.cpp +++ b/src/Databases/DatabaseFactory.cpp @@ -103,8 +103,11 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String if (engine_define->engine->arguments && !engine_may_have_arguments) throw Exception("Database engine " + engine_name + " cannot have arguments", ErrorCodes::BAD_ARGUMENTS); - if (engine_define->engine->parameters || engine_define->partition_by || engine_define->primary_key || engine_define->order_by || - engine_define->sample_by || (!endsWith(engine_name, "MySQL") && engine_define->settings)) + bool has_unexpected_element = engine_define->engine->parameters || engine_define->partition_by || + engine_define->primary_key || engine_define->order_by || + engine_define->sample_by; + bool may_have_settings = endsWith(engine_name, "MySQL") || engine_name == "Replicated"; + if (has_unexpected_element || (!may_have_settings && engine_define->settings)) throw Exception("Database engine " + engine_name + " cannot have parameters, primary_key, order_by, sample_by, settings", ErrorCodes::UNKNOWN_ELEMENT_IN_AST); @@ -205,7 +208,13 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String shard_name = context.getMacros()->expand(shard_name); replica_name = context.getMacros()->expand(replica_name); - return std::make_shared(database_name, metadata_path, uuid, zookeeper_path, shard_name, replica_name, context); + DatabaseReplicatedSettings database_replicated_settings{}; + if (engine_define->settings) + database_replicated_settings.loadFromQuery(*engine_define); + + return std::make_shared(database_name, metadata_path, uuid, + zookeeper_path, shard_name, replica_name, + std::move(database_replicated_settings), context); } #if USE_LIBPQXX diff --git a/src/Databases/DatabaseReplicated.cpp b/src/Databases/DatabaseReplicated.cpp index 441880ae616..12cff3407d3 100644 --- a/src/Databases/DatabaseReplicated.cpp +++ b/src/Databases/DatabaseReplicated.cpp @@ -63,11 +63,13 @@ DatabaseReplicated::DatabaseReplicated( const String & zookeeper_path_, const String & shard_name_, const String & replica_name_, + DatabaseReplicatedSettings db_settings_, const Context & context_) : DatabaseAtomic(name_, metadata_path_, uuid, "DatabaseReplicated (" + name_ + ")", context_) , zookeeper_path(zookeeper_path_) , shard_name(shard_name_) , replica_name(replica_name_) + , db_settings(std::move(db_settings_)) { if (zookeeper_path.empty() || shard_name.empty() || replica_name.empty()) throw Exception("ZooKeeper path, shard and replica names must be non-empty", ErrorCodes::BAD_ARGUMENTS); @@ -141,7 +143,8 @@ ClusterPtr DatabaseReplicated::getCluster() const break; } if (!success) - throw Exception(ErrorCodes::ALL_CONNECTION_TRIES_FAILED, "Cannot get consistent cluster snapshot"); + throw Exception(ErrorCodes::ALL_CONNECTION_TRIES_FAILED, "Cannot get consistent cluster snapshot," + "because replicas are created or removed concurrently"); assert(!hosts.empty()); assert(hosts.size() == host_ids.size()); @@ -172,7 +175,7 @@ ClusterPtr DatabaseReplicated::getCluster() const return std::make_shared(global_context.getSettingsRef(), shards, username, password, global_context.getTCPPort(), false); } -void DatabaseReplicated::tryConnectToZooKeeper(bool force_attach) +void DatabaseReplicated::tryConnectToZooKeeperAndInitDatabase(bool force_attach) { try { @@ -228,6 +231,9 @@ bool DatabaseReplicated::createDatabaseNodesInZooKeeper(const zkutil::ZooKeeperP ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log", "", zkutil::CreateMode::Persistent)); ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/replicas", "", zkutil::CreateMode::Persistent)); ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/counter", "", zkutil::CreateMode::Persistent)); + /// We create and remove counter/cnt- node to increment sequential number of counter/ node and make log entry numbers start from 1. + /// New replicas are created with log pointer equal to 0 and log pointer is a number of the last executed entry. + /// It means that we cannot have log entry with number 0. ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/counter/cnt-", "", zkutil::CreateMode::Persistent)); ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/counter/cnt-", -1)); ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/metadata", "", zkutil::CreateMode::Persistent)); @@ -253,10 +259,7 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt auto host_id = getHostID(global_context, db_uuid); /// On replica creation add empty entry to log. Can be used to trigger some actions on other replicas (e.g. update cluster info). - DDLLogEntry entry; - entry.hosts = {}; - entry.query = {}; - entry.initiator = {}; + DDLLogEntry entry{}; String query_path_prefix = zookeeper_path + "/log/query-"; String counter_prefix = zookeeper_path + "/counter/cnt-"; @@ -273,7 +276,7 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt void DatabaseReplicated::loadStoredObjects(Context & context, bool has_force_restore_data_flag, bool force_attach) { - tryConnectToZooKeeper(force_attach); + tryConnectToZooKeeperAndInitDatabase(force_attach); DatabaseAtomic::loadStoredObjects(context, has_force_restore_data_flag, force_attach); @@ -281,7 +284,7 @@ void DatabaseReplicated::loadStoredObjects(Context & context, bool has_force_res ddl_worker->startup(); } -BlockIO DatabaseReplicated::propose(const ASTPtr & query, const Context & query_context) +BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, const Context & query_context) { if (is_readonly) throw Exception(ErrorCodes::NO_ZOOKEEPER, "Database is in readonly mode, because it cannot connect to ZooKeeper"); @@ -405,7 +408,7 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep String db_name = getDatabaseName(); String to_db_name = getDatabaseName() + BROKEN_TABLES_SUFFIX; - if (total_tables < tables_to_detach.size() * 2) + if (total_tables * db_settings.max_broken_tables_ratio < tables_to_detach.size()) throw Exception(ErrorCodes::DATABASE_REPLICATION_FAILED, "Too many tables to recreate: {} of {}", tables_to_detach.size(), total_tables); else if (!tables_to_detach.empty()) { @@ -594,12 +597,12 @@ void DatabaseReplicated::shutdown() void DatabaseReplicated::dropTable(const Context & context, const String & table_name, bool no_delay) { - auto txn = context.getMetadataTransaction(); + auto txn = context.getZooKeeperMetadataTransaction(); assert(!ddl_worker->isCurrentlyActive() || txn); - if (txn && txn->is_initial_query) + if (txn && txn->isInitialQuery()) { String metadata_zk_path = zookeeper_path + "/metadata/" + escapeForFileName(table_name); - txn->ops.emplace_back(zkutil::makeRemoveRequest(metadata_zk_path, -1)); + txn->addOp(zkutil::makeRemoveRequest(metadata_zk_path, -1)); } DatabaseAtomic::dropTable(context, table_name, no_delay); } @@ -607,10 +610,10 @@ void DatabaseReplicated::dropTable(const Context & context, const String & table void DatabaseReplicated::renameTable(const Context & context, const String & table_name, IDatabase & to_database, const String & to_table_name, bool exchange, bool dictionary) { - auto txn = context.getMetadataTransaction(); + auto txn = context.getZooKeeperMetadataTransaction(); assert(txn); - if (txn->is_initial_query) + if (txn->isInitialQuery()) { if (this != &to_database) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Moving tables between databases is not supported for Replicated engine"); @@ -622,16 +625,16 @@ void DatabaseReplicated::renameTable(const Context & context, const String & tab throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {} does not exist", to_table_name); String statement = readMetadataFile(table_name); - String metadata_zk_path = txn->zookeeper_path + "/metadata/" + escapeForFileName(table_name); - String metadata_zk_path_to = txn->zookeeper_path + "/metadata/" + escapeForFileName(to_table_name); - txn->ops.emplace_back(zkutil::makeRemoveRequest(metadata_zk_path, -1)); + String metadata_zk_path = zookeeper_path + "/metadata/" + escapeForFileName(table_name); + String metadata_zk_path_to = zookeeper_path + "/metadata/" + escapeForFileName(to_table_name); + txn->addOp(zkutil::makeRemoveRequest(metadata_zk_path, -1)); if (exchange) { String statement_to = readMetadataFile(to_table_name); - txn->ops.emplace_back(zkutil::makeRemoveRequest(metadata_zk_path_to, -1)); - txn->ops.emplace_back(zkutil::makeCreateRequest(metadata_zk_path, statement_to, zkutil::CreateMode::Persistent)); + txn->addOp(zkutil::makeRemoveRequest(metadata_zk_path_to, -1)); + txn->addOp(zkutil::makeCreateRequest(metadata_zk_path, statement_to, zkutil::CreateMode::Persistent)); } - txn->ops.emplace_back(zkutil::makeCreateRequest(metadata_zk_path_to, statement, zkutil::CreateMode::Persistent)); + txn->addOp(zkutil::makeCreateRequest(metadata_zk_path_to, statement, zkutil::CreateMode::Persistent)); } DatabaseAtomic::renameTable(context, table_name, to_database, to_table_name, exchange, dictionary); @@ -641,14 +644,14 @@ void DatabaseReplicated::commitCreateTable(const ASTCreateQuery & query, const S const String & table_metadata_tmp_path, const String & table_metadata_path, const Context & query_context) { - auto txn = query_context.getMetadataTransaction(); + auto txn = query_context.getZooKeeperMetadataTransaction(); assert(!ddl_worker->isCurrentlyActive() || txn); - if (txn && txn->is_initial_query) + if (txn && txn->isInitialQuery()) { - String metadata_zk_path = txn->zookeeper_path + "/metadata/" + escapeForFileName(query.table); + String metadata_zk_path = zookeeper_path + "/metadata/" + escapeForFileName(query.table); String statement = getObjectDefinitionFromCreateQuery(query.clone()); /// zk::multi(...) will throw if `metadata_zk_path` exists - txn->ops.emplace_back(zkutil::makeCreateRequest(metadata_zk_path, statement, zkutil::CreateMode::Persistent)); + txn->addOp(zkutil::makeCreateRequest(metadata_zk_path, statement, zkutil::CreateMode::Persistent)); } DatabaseAtomic::commitCreateTable(query, table, table_metadata_tmp_path, table_metadata_path, query_context); } @@ -657,11 +660,11 @@ void DatabaseReplicated::commitAlterTable(const StorageID & table_id, const String & table_metadata_tmp_path, const String & table_metadata_path, const String & statement, const Context & query_context) { - auto txn = query_context.getMetadataTransaction(); - if (txn && txn->is_initial_query) + auto txn = query_context.getZooKeeperMetadataTransaction(); + if (txn && txn->isInitialQuery()) { - String metadata_zk_path = txn->zookeeper_path + "/metadata/" + escapeForFileName(table_id.table_name); - txn->ops.emplace_back(zkutil::makeSetRequest(metadata_zk_path, statement, -1)); + String metadata_zk_path = zookeeper_path + "/metadata/" + escapeForFileName(table_id.table_name); + txn->addOp(zkutil::makeSetRequest(metadata_zk_path, statement, -1)); } DatabaseAtomic::commitAlterTable(table_id, table_metadata_tmp_path, table_metadata_path, statement, query_context); } @@ -670,37 +673,37 @@ void DatabaseReplicated::createDictionary(const Context & context, const String & dictionary_name, const ASTPtr & query) { - auto txn = context.getMetadataTransaction(); + auto txn = context.getZooKeeperMetadataTransaction(); assert(!ddl_worker->isCurrentlyActive() || txn); - if (txn && txn->is_initial_query) + if (txn && txn->isInitialQuery()) { - String metadata_zk_path = txn->zookeeper_path + "/metadata/" + escapeForFileName(dictionary_name); + String metadata_zk_path = zookeeper_path + "/metadata/" + escapeForFileName(dictionary_name); String statement = getObjectDefinitionFromCreateQuery(query->clone()); - txn->ops.emplace_back(zkutil::makeCreateRequest(metadata_zk_path, statement, zkutil::CreateMode::Persistent)); + txn->addOp(zkutil::makeCreateRequest(metadata_zk_path, statement, zkutil::CreateMode::Persistent)); } DatabaseAtomic::createDictionary(context, dictionary_name, query); } void DatabaseReplicated::removeDictionary(const Context & context, const String & dictionary_name) { - auto txn = context.getMetadataTransaction(); + auto txn = context.getZooKeeperMetadataTransaction(); assert(!ddl_worker->isCurrentlyActive() || txn); - if (txn && txn->is_initial_query) + if (txn && txn->isInitialQuery()) { String metadata_zk_path = zookeeper_path + "/metadata/" + escapeForFileName(dictionary_name); - txn->ops.emplace_back(zkutil::makeRemoveRequest(metadata_zk_path, -1)); + txn->addOp(zkutil::makeRemoveRequest(metadata_zk_path, -1)); } DatabaseAtomic::removeDictionary(context, dictionary_name); } void DatabaseReplicated::detachTablePermanently(const Context & context, const String & table_name) { - auto txn = context.getMetadataTransaction(); + auto txn = context.getZooKeeperMetadataTransaction(); assert(!ddl_worker->isCurrentlyActive() || txn); - if (txn && txn->is_initial_query) + if (txn && txn->isInitialQuery()) { String metadata_zk_path = zookeeper_path + "/metadata/" + escapeForFileName(table_name); - txn->ops.emplace_back(zkutil::makeRemoveRequest(metadata_zk_path, -1)); + txn->addOp(zkutil::makeRemoveRequest(metadata_zk_path, -1)); } DatabaseAtomic::detachTablePermanently(context, table_name); } diff --git a/src/Databases/DatabaseReplicated.h b/src/Databases/DatabaseReplicated.h index a3a53e02ee4..fde53cf2c29 100644 --- a/src/Databases/DatabaseReplicated.h +++ b/src/Databases/DatabaseReplicated.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -22,13 +23,14 @@ class DatabaseReplicated : public DatabaseAtomic public: DatabaseReplicated(const String & name_, const String & metadata_path_, UUID uuid, const String & zookeeper_path_, const String & shard_name_, const String & replica_name_, + DatabaseReplicatedSettings db_settings_, const Context & context); ~DatabaseReplicated() override; String getEngineName() const override { return "Replicated"; } - /// If current query is initial, then the following methods add metadata updating ZooKeeper operations to current MetadataTransaction. + /// If current query is initial, then the following methods add metadata updating ZooKeeper operations to current ZooKeeperMetadataTransaction. void dropTable(const Context &, const String & table_name, bool no_delay) override; void renameTable(const Context & context, const String & table_name, IDatabase & to_database, const String & to_table_name, bool exchange, bool dictionary) override; @@ -46,7 +48,7 @@ public: /// Try to execute DLL query on current host as initial query. If query is succeed, /// then it will be executed on all replicas. - BlockIO propose(const ASTPtr & query, const Context & query_context); + BlockIO tryEnqueueReplicatedDDL(const ASTPtr & query, const Context & query_context); void stopReplication(); @@ -64,7 +66,7 @@ public: friend struct DatabaseReplicatedTask; friend class DatabaseReplicatedDDLWorker; private: - void tryConnectToZooKeeper(bool force_attach); + void tryConnectToZooKeeperAndInitDatabase(bool force_attach); bool createDatabaseNodesInZooKeeper(const ZooKeeperPtr & current_zookeeper); void createReplicaNodesInZooKeeper(const ZooKeeperPtr & current_zookeeper); @@ -78,6 +80,7 @@ private: String shard_name; String replica_name; String replica_path; + DatabaseReplicatedSettings db_settings; zkutil::ZooKeeperPtr getZooKeeper() const; diff --git a/src/Databases/DatabaseReplicatedSettings.cpp b/src/Databases/DatabaseReplicatedSettings.cpp new file mode 100644 index 00000000000..61febcf2810 --- /dev/null +++ b/src/Databases/DatabaseReplicatedSettings.cpp @@ -0,0 +1,23 @@ +#include +#include +#include + +namespace DB +{ + +IMPLEMENT_SETTINGS_TRAITS(DatabaseReplicatedSettingsTraits, LIST_OF_DATABASE_REPLICATED_SETTINGS) + +void DatabaseReplicatedSettings::loadFromQuery(ASTStorage & storage_def) +{ + if (storage_def.settings) + { + applyChanges(storage_def.settings->changes); + return; + } + + auto settings_ast = std::make_shared(); + settings_ast->is_standalone = false; + storage_def.set(storage_def.settings, settings_ast); +} + +} diff --git a/src/Databases/DatabaseReplicatedSettings.h b/src/Databases/DatabaseReplicatedSettings.h new file mode 100644 index 00000000000..11d5b3820e4 --- /dev/null +++ b/src/Databases/DatabaseReplicatedSettings.h @@ -0,0 +1,26 @@ +#pragma once +#include +#include + +namespace DB +{ + +class ASTStorage; + +#define LIST_OF_DATABASE_REPLICATED_SETTINGS(M) \ + M(Float, max_broken_tables_ratio, 0.5, "Do not recover replica automatically if the ratio of staled tables to all tables is greater", 0) \ + M(UInt64, max_replication_lag_to_enqueue, 10, "Replica will throw exception on attempt to execute query if its replication lag greater", 0) \ + M(UInt64, wait_entry_commited_timeout_sec, 3600, "Replicas will try to cancel query if timeout exceed, but initiator host has not executed it yet", 0) \ + +DECLARE_SETTINGS_TRAITS(DatabaseReplicatedSettingsTraits, LIST_OF_DATABASE_REPLICATED_SETTINGS) + + +/** Settings for the MaterializeMySQL database engine. + * Could be loaded from a CREATE DATABASE query (SETTINGS clause). + */ +struct DatabaseReplicatedSettings : public BaseSettings +{ + void loadFromQuery(ASTStorage & storage_def); +}; + +} diff --git a/src/Databases/DatabaseReplicatedWorker.cpp b/src/Databases/DatabaseReplicatedWorker.cpp index ff15878b136..e0c5717711c 100644 --- a/src/Databases/DatabaseReplicatedWorker.cpp +++ b/src/Databases/DatabaseReplicatedWorker.cpp @@ -30,7 +30,7 @@ void DatabaseReplicatedDDLWorker::initializeMainThread() { auto zookeeper = getAndSetZooKeeper(); if (database->is_readonly) - database->tryConnectToZooKeeper(false); + database->tryConnectToZooKeeperAndInitDatabase(false); initializeReplication(); initialized = true; return; @@ -98,8 +98,7 @@ String DatabaseReplicatedDDLWorker::tryEnqueueAndExecuteEntry(DDLLogEntry & entr UInt32 our_log_ptr = parse(zookeeper->get(database->replica_path + "/log_ptr")); UInt32 max_log_ptr = parse(zookeeper->get(database->zookeeper_path + "/max_log_ptr")); assert(our_log_ptr <= max_log_ptr); - constexpr UInt32 max_replication_lag = 16; - if (max_replication_lag < max_log_ptr - our_log_ptr) + if (database->db_settings.max_replication_lag_to_enqueue < max_log_ptr - our_log_ptr) throw Exception(ErrorCodes::NOT_A_LEADER, "Cannot enqueue query on this replica, " "because it has replication lag of {} queries. Try other replica.", max_log_ptr - our_log_ptr); @@ -131,7 +130,7 @@ String DatabaseReplicatedDDLWorker::tryEnqueueAndExecuteEntry(DDLLogEntry & entr if (zookeeper->expired() || stop_flag) throw Exception(ErrorCodes::DATABASE_REPLICATION_FAILED, "ZooKeeper session expired or replication stopped, try again"); - processTask(*task); + processTask(*task, zookeeper); if (!task->was_executed) { @@ -139,7 +138,7 @@ String DatabaseReplicatedDDLWorker::tryEnqueueAndExecuteEntry(DDLLogEntry & entr task->execution_status.code, task->execution_status.message); } - try_node->reset(); + try_node->setAlreadyRemoved(); return entry_path; } @@ -178,7 +177,7 @@ DDLTaskPtr DatabaseReplicatedDDLWorker::initAndCheckTask(const String & entry_na /// Query is not committed yet. We cannot just skip it and execute next one, because reordering may break replication. LOG_TRACE(log, "Waiting for initiator {} to commit or rollback entry {}", initiator_name, entry_path); constexpr size_t wait_time_ms = 1000; - constexpr size_t max_iterations = 3600; + size_t max_iterations = database->db_settings.wait_entry_commited_timeout_sec; size_t iteration = 0; while (!wait_committed_or_failed->tryWait(wait_time_ms)) @@ -194,7 +193,7 @@ DDLTaskPtr DatabaseReplicatedDDLWorker::initAndCheckTask(const String & entry_na if (max_iterations <= ++iteration) { /// What can we do if initiator hangs for some reason? Seems like we can remove /try node. - /// Initiator will fail to commit entry to ZK (including ops for replicated table) if /try does not exist. + /// Initiator will fail to commit ZooKeeperMetadataTransaction (including ops for replicated table) if /try does not exist. /// But it's questionable. /// We use tryRemove(...) because multiple hosts (including initiator) may try to do it concurrently. diff --git a/src/Databases/DatabaseReplicatedWorker.h b/src/Databases/DatabaseReplicatedWorker.h index 1eafe2489e7..6dd8dc408d7 100644 --- a/src/Databases/DatabaseReplicatedWorker.h +++ b/src/Databases/DatabaseReplicatedWorker.h @@ -6,6 +6,18 @@ namespace DB class DatabaseReplicated; +/// It's similar to DDLWorker, but has the following differences: +/// 1. DDL queue in ZooKeeper is not shared between multiple clusters and databases, +/// each DatabaseReplicated has its own queue in ZooKeeper and DatabaseReplicatedDDLWorker object. +/// 2. Shards and replicas are identified by shard_name and replica_name arguments of database engine, +/// not by address:port pairs. Cluster (of multiple database replicas) is identified by its zookeeper_path. +/// 3. After creation of an entry in DDL queue initiator tries to execute the entry locally +/// and other hosts wait for query to finish on initiator host. +/// If query succeed on initiator, then all hosts must execute it, so they will retry until query succeed. +/// We assume that cluster is homogenous, so if replicas are in consistent state and query succeed on one host, +/// then all hosts can execute it (maybe after several retries). +/// 4. Each database replica stores its log pointer in ZooKeeper. Cleanup thread removes old entry +/// if its number < max_log_ptr - logs_to_keep. class DatabaseReplicatedDDLWorker : public DDLWorker { public: diff --git a/src/Databases/DatabaseWithDictionaries.cpp b/src/Databases/DatabaseWithDictionaries.cpp index 7ce5de56b64..d92f0f1897e 100644 --- a/src/Databases/DatabaseWithDictionaries.cpp +++ b/src/Databases/DatabaseWithDictionaries.cpp @@ -194,7 +194,7 @@ void DatabaseWithDictionaries::createDictionary(const Context & context, const S detachDictionary(dictionary_name); }); - auto txn = context.getMetadataTransaction(); + auto txn = context.getZooKeeperMetadataTransaction(); if (txn && !context.isInternalSubquery()) txn->commit(); /// Commit point (a sort of) for Replicated database @@ -219,7 +219,7 @@ void DatabaseWithDictionaries::removeDictionary(const Context & context, const S { String dictionary_metadata_path = getObjectMetadataPath(dictionary_name); - auto txn = context.getMetadataTransaction(); + auto txn = context.getZooKeeperMetadataTransaction(); if (txn && !context.isInternalSubquery()) txn->commit(); /// Commit point (a sort of) for Replicated database diff --git a/src/Databases/ya.make b/src/Databases/ya.make index 38f79532080..8bd3f291a64 100644 --- a/src/Databases/ya.make +++ b/src/Databases/ya.make @@ -17,6 +17,7 @@ SRCS( DatabaseOnDisk.cpp DatabaseOrdinary.cpp DatabaseReplicated.cpp + DatabaseReplicatedSettings.cpp DatabaseReplicatedWorker.cpp DatabaseWithDictionaries.cpp DatabasesCommon.cpp diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 766b14dea42..98e4a87fba3 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -2553,14 +2553,14 @@ StorageID Context::resolveStorageIDImpl(StorageID storage_id, StorageNamespace w return StorageID::createEmpty(); } -void Context::initMetadataTransaction(MetadataTransactionPtr txn, [[maybe_unused]] bool attach_existing) +void Context::initZooKeeperMetadataTransaction(ZooKeeperMetadataTransactionPtr txn, [[maybe_unused]] bool attach_existing) { assert(!metadata_transaction); assert(attach_existing || query_context == this); metadata_transaction = std::move(txn); } -MetadataTransactionPtr Context::getMetadataTransaction() const +ZooKeeperMetadataTransactionPtr Context::getZooKeeperMetadataTransaction() const { assert(!metadata_transaction || hasQueryContext()); return metadata_transaction; diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 24d0eb4b0de..563fb172488 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -117,8 +117,8 @@ using VolumePtr = std::shared_ptr; struct NamedSession; struct BackgroundTaskSchedulingSettings; -struct MetadataTransaction; -using MetadataTransactionPtr = std::shared_ptr; +class ZooKeeperMetadataTransaction; +using ZooKeeperMetadataTransactionPtr = std::shared_ptr; #if USE_EMBEDDED_COMPILER class CompiledExpressionCache; @@ -281,7 +281,7 @@ private: /// to be customized in HTTP and TCP servers by overloading the customizeContext(DB::Context&) /// methods. - MetadataTransactionPtr metadata_transaction; /// Distributed DDL context. I'm not sure if it's a suitable place for this, + ZooKeeperMetadataTransactionPtr metadata_transaction; /// Distributed DDL context. I'm not sure if it's a suitable place for this, /// but it's the easiest way to pass this through the whole stack from executeQuery(...) /// to DatabaseOnDisk::commitCreateTable(...) or IStorage::alter(...) without changing /// thousands of signatures. @@ -746,8 +746,10 @@ public: IHostContextPtr & getHostContext(); const IHostContextPtr & getHostContext() const; - void initMetadataTransaction(MetadataTransactionPtr txn, bool attach_existing = false); - MetadataTransactionPtr getMetadataTransaction() const; + /// Initialize context of distributed DDL query with Replicated database. + void initZooKeeperMetadataTransaction(ZooKeeperMetadataTransactionPtr txn, bool attach_existing = false); + /// Returns context of current distributed DDL query or nullptr. + ZooKeeperMetadataTransactionPtr getZooKeeperMetadataTransaction() const; struct MySQLWireContext { diff --git a/src/Interpreters/DDLTask.cpp b/src/Interpreters/DDLTask.cpp index 7f47f0a6659..4be465d3de4 100644 --- a/src/Interpreters/DDLTask.cpp +++ b/src/Interpreters/DDLTask.cpp @@ -96,7 +96,7 @@ void DDLTaskBase::parseQueryFromEntry(const Context & context) query = parseQuery(parser_query, begin, end, description, 0, context.getSettingsRef().max_parser_depth); } -std::unique_ptr DDLTaskBase::makeQueryContext(Context & from_context) +std::unique_ptr DDLTaskBase::makeQueryContext(Context & from_context, const ZooKeeperPtr & /*zookeeper*/) { auto query_context = std::make_unique(from_context); query_context->makeQueryContext(); @@ -293,28 +293,26 @@ String DatabaseReplicatedTask::getShardID() const return database->shard_name; } -std::unique_ptr DatabaseReplicatedTask::makeQueryContext(Context & from_context) +std::unique_ptr DatabaseReplicatedTask::makeQueryContext(Context & from_context, const ZooKeeperPtr & zookeeper) { - auto query_context = DDLTaskBase::makeQueryContext(from_context); + auto query_context = DDLTaskBase::makeQueryContext(from_context, zookeeper); query_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; query_context->setCurrentDatabase(database->getDatabaseName()); - auto txn = std::make_shared(); - query_context->initMetadataTransaction(txn); - txn->current_zookeeper = from_context.getZooKeeper(); - txn->zookeeper_path = database->zookeeper_path; - txn->is_initial_query = is_initial_query; + auto txn = std::make_shared(zookeeper, database->zookeeper_path, is_initial_query); + query_context->initZooKeeperMetadataTransaction(txn); if (is_initial_query) { - txn->ops.emplace_back(zkutil::makeRemoveRequest(entry_path + "/try", -1)); - txn->ops.emplace_back(zkutil::makeCreateRequest(entry_path + "/committed", host_id_str, zkutil::CreateMode::Persistent)); - txn->ops.emplace_back(zkutil::makeSetRequest(database->zookeeper_path + "/max_log_ptr", toString(getLogEntryNumber(entry_name)), -1)); + txn->addOp(zkutil::makeRemoveRequest(entry_path + "/try", -1)); + txn->addOp(zkutil::makeCreateRequest(entry_path + "/committed", host_id_str, zkutil::CreateMode::Persistent)); + txn->addOp(zkutil::makeSetRequest(database->zookeeper_path + "/max_log_ptr", toString(getLogEntryNumber(entry_name)), -1)); } - txn->ops.emplace_back(zkutil::makeSetRequest(database->replica_path + "/log_ptr", toString(getLogEntryNumber(entry_name)), -1)); + txn->addOp(zkutil::makeSetRequest(database->replica_path + "/log_ptr", toString(getLogEntryNumber(entry_name)), -1)); - std::move(ops.begin(), ops.end(), std::back_inserter(txn->ops)); + for (auto & op : ops) + txn->addOp(std::move(op)); ops.clear(); return query_context; @@ -335,7 +333,7 @@ UInt32 DDLTaskBase::getLogEntryNumber(const String & log_entry_name) return parse(log_entry_name.substr(strlen(name))); } -void MetadataTransaction::commit() +void ZooKeeperMetadataTransaction::commit() { assert(state == CREATED); state = FAILED; diff --git a/src/Interpreters/DDLTask.h b/src/Interpreters/DDLTask.h index f02e17103aa..18c1f4c80cd 100644 --- a/src/Interpreters/DDLTask.h +++ b/src/Interpreters/DDLTask.h @@ -20,8 +20,8 @@ class ASTQueryWithOnCluster; using ZooKeeperPtr = std::shared_ptr; class DatabaseReplicated; -struct MetadataTransaction; -using MetadataTransactionPtr = std::shared_ptr; +class ZooKeeperMetadataTransaction; +using ZooKeeperMetadataTransactionPtr = std::shared_ptr; struct HostID { @@ -95,7 +95,7 @@ struct DDLTaskBase virtual String getShardID() const = 0; - virtual std::unique_ptr makeQueryContext(Context & from_context); + virtual std::unique_ptr makeQueryContext(Context & from_context, const ZooKeeperPtr & zookeeper); inline String getActiveNodePath() const { return entry_path + "/active/" + host_id_str; } inline String getFinishedNodePath() const { return entry_path + "/finished/" + host_id_str; } @@ -132,13 +132,19 @@ struct DatabaseReplicatedTask : public DDLTaskBase DatabaseReplicatedTask(const String & name, const String & path, DatabaseReplicated * database_); String getShardID() const override; - std::unique_ptr makeQueryContext(Context & from_context) override; + std::unique_ptr makeQueryContext(Context & from_context, const ZooKeeperPtr & zookeeper) override; DatabaseReplicated * database; }; - -struct MetadataTransaction +/// The main purpose of ZooKeeperMetadataTransaction is to execute all zookeeper operation related to query +/// in a single transaction when we performed all required checks and ready to "commit" changes. +/// For example, create ALTER_METADATA entry in ReplicatedMergeTree log, +/// create path/to/entry/finished/host_id node in distributed DDL queue to mark query as executed and +/// update metadata in path/to/replicated_database/metadata/table_name +/// It's used for DatabaseReplicated. +/// TODO we can also use it for ordinary ON CLUSTER queries +class ZooKeeperMetadataTransaction { enum State { @@ -153,8 +159,29 @@ struct MetadataTransaction bool is_initial_query; Coordination::Requests ops; +public: + ZooKeeperMetadataTransaction(const ZooKeeperPtr & current_zookeeper_, const String & zookeeper_path_, bool is_initial_query_) + : current_zookeeper(current_zookeeper_) + , zookeeper_path(zookeeper_path_) + , is_initial_query(is_initial_query_) + { + } + + bool isInitialQuery() const { return is_initial_query; } + + bool isExecuted() const { return state != CREATED; } + + String getDatabaseZooKeeperPath() const { return zookeeper_path; } + + void addOp(Coordination::RequestPtr && op) + { + assert(!isExecuted()); + ops.emplace_back(op); + } + void moveOpsTo(Coordination::Requests & other_ops) { + assert(!isExecuted()); std::move(ops.begin(), ops.end(), std::back_inserter(other_ops)); ops.clear(); state = COMMITTED; @@ -162,7 +189,7 @@ struct MetadataTransaction void commit(); - ~MetadataTransaction() { assert(state != CREATED || std::uncaught_exception()); } + ~ZooKeeperMetadataTransaction() { assert(isExecuted() || std::uncaught_exception()); } }; } diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index 12fd03b3b70..67f716c235c 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -195,16 +195,15 @@ void DDLWorker::startup() void DDLWorker::shutdown() { - stop_flag = true; - queue_updated_event->set(); - cleanup_event->set(); - - if (main_thread.joinable()) + bool prev_stop_flag = stop_flag.exchange(true); + if (!prev_stop_flag) + { + queue_updated_event->set(); + cleanup_event->set(); main_thread.join(); - if (cleanup_thread.joinable()) cleanup_thread.join(); - - worker_pool.reset(); + worker_pool.reset(); + } } DDLWorker::~DDLWorker() @@ -267,6 +266,8 @@ DDLTaskPtr DDLWorker::initAndCheckTask(const String & entry_name, String & out_r } /// Stage 2: resolve host_id and check if we should execute query or not + /// Multiple clusters can use single DDL queue path in ZooKeeper, + /// So we should skip task if we cannot find current host in cluster hosts list. if (!task->findCurrentHostID(context, log)) { out_reason = "There is no a local address in host list"; @@ -317,7 +318,7 @@ void DDLWorker::scheduleTasks() bool status_written = zookeeper->exists(task->getFinishedNodePath()); if (task->was_executed && !status_written && task_still_exists) { - processTask(*task); + processTask(*task, zookeeper); } } @@ -364,15 +365,15 @@ void DDLWorker::scheduleTasks() if (worker_pool) { - worker_pool->scheduleOrThrowOnError([this, &saved_task]() + worker_pool->scheduleOrThrowOnError([this, &saved_task, &zookeeper]() { setThreadName("DDLWorkerExec"); - processTask(saved_task); + processTask(saved_task, zookeeper); }); } else { - processTask(saved_task); + processTask(saved_task, zookeeper); } } } @@ -385,7 +386,7 @@ DDLTaskBase & DDLWorker::saveTask(DDLTaskPtr && task) return *current_tasks.back(); } -bool DDLWorker::tryExecuteQuery(const String & query, DDLTaskBase & task) +bool DDLWorker::tryExecuteQuery(const String & query, DDLTaskBase & task, const ZooKeeperPtr & zookeeper) { /// Add special comment at the start of query to easily identify DDL-produced queries in query_log String query_prefix = "/* ddl_entry=" + task.entry_name + " */ "; @@ -398,14 +399,16 @@ bool DDLWorker::tryExecuteQuery(const String & query, DDLTaskBase & task) try { - auto query_context = task.makeQueryContext(context); + auto query_context = task.makeQueryContext(context, zookeeper); if (!task.is_initial_query) query_scope.emplace(*query_context); executeQuery(istr, ostr, !task.is_initial_query, *query_context, {}); - if (auto txn = query_context->getMetadataTransaction()) + if (auto txn = query_context->getZooKeeperMetadataTransaction()) { - if (txn->state == MetadataTransaction::CREATED) + /// Most queries commit changes to ZooKeeper right before applying local changes, + /// but some queries does not support it, so we have to do it here. + if (!txn->isExecuted()) txn->commit(); } } @@ -463,10 +466,8 @@ void DDLWorker::updateMaxDDLEntryID(const String & entry_name) } } -void DDLWorker::processTask(DDLTaskBase & task) +void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper) { - auto zookeeper = tryGetZooKeeper(); - LOG_DEBUG(log, "Processing task {} ({})", task.entry_name, task.entry.query); String active_node_path = task.getActiveNodePath(); @@ -541,7 +542,7 @@ void DDLWorker::processTask(DDLTaskBase & task) else { storage.reset(); - tryExecuteQuery(rewritten_query, task); + tryExecuteQuery(rewritten_query, task, zookeeper); } } catch (const Coordination::Exception &) @@ -565,7 +566,7 @@ void DDLWorker::processTask(DDLTaskBase & task) } else { - /// task.ops where not executed by table or database engine, se DDLWorker is responsible for + /// task.ops where not executed by table or database engine, so DDLWorker is responsible for /// writing query execution status into ZooKeeper. task.ops.emplace_back(zkutil::makeSetRequest(finished_node_path, task.execution_status.serializeText(), -1)); } @@ -589,7 +590,7 @@ void DDLWorker::processTask(DDLTaskBase & task) } /// Active node was removed in multi ops - active_node->reset(); + active_node->setAlreadyRemoved(); task.completely_processed = true; } @@ -712,7 +713,7 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica( /// If the leader will unexpectedly changed this method will return false /// and on the next iteration new leader will take lock - if (tryExecuteQuery(rewritten_query, task)) + if (tryExecuteQuery(rewritten_query, task, zookeeper)) { executed_by_us = true; break; diff --git a/src/Interpreters/DDLWorker.h b/src/Interpreters/DDLWorker.h index c39a832c098..8b0a8f038a0 100644 --- a/src/Interpreters/DDLWorker.h +++ b/src/Interpreters/DDLWorker.h @@ -77,7 +77,7 @@ protected: /// Returns non-empty DDLTaskPtr if entry parsed and the check is passed virtual DDLTaskPtr initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper); - void processTask(DDLTaskBase & task); + void processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper); void updateMaxDDLEntryID(const String & entry_name); /// Check that query should be executed on leader replica only @@ -95,7 +95,7 @@ protected: const String & node_path, const ZooKeeperPtr & zookeeper); - bool tryExecuteQuery(const String & query, DDLTaskBase & task); + bool tryExecuteQuery(const String & query, DDLTaskBase & task, const ZooKeeperPtr & zookeeper); /// Checks and cleanups queue's nodes void cleanupQueue(Int64 current_time_seconds, const ZooKeeperPtr & zookeeper); diff --git a/src/Interpreters/InterpreterAlterQuery.cpp b/src/Interpreters/InterpreterAlterQuery.cpp index 402f05895bc..bf624507574 100644 --- a/src/Interpreters/InterpreterAlterQuery.cpp +++ b/src/Interpreters/InterpreterAlterQuery.cpp @@ -54,7 +54,7 @@ BlockIO InterpreterAlterQuery::execute() { auto guard = DatabaseCatalog::instance().getDDLGuard(table_id.database_name, table_id.table_name); guard->releaseTableLock(); - return typeid_cast(database.get())->propose(query_ptr, context); + return typeid_cast(database.get())->tryEnqueueReplicatedDDL(query_ptr, context); } StoragePtr table = DatabaseCatalog::instance().getTable(table_id, context); diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp index 2021c1f1d60..2b1dddde78c 100644 --- a/src/Interpreters/InterpreterCreateQuery.cpp +++ b/src/Interpreters/InterpreterCreateQuery.cpp @@ -880,7 +880,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create) { assertOrSetUUID(create, database); guard->releaseTableLock(); - return typeid_cast(database.get())->propose(query_ptr, context); + return typeid_cast(database.get())->tryEnqueueReplicatedDDL(query_ptr, context); } } @@ -1092,7 +1092,7 @@ BlockIO InterpreterCreateQuery::createDictionary(ASTCreateQuery & create) if (!create.attach) assertOrSetUUID(create, database); guard->releaseTableLock(); - return typeid_cast(database.get())->propose(query_ptr, context); + return typeid_cast(database.get())->tryEnqueueReplicatedDDL(query_ptr, context); } if (database->isDictionaryExist(dictionary_name)) diff --git a/src/Interpreters/InterpreterDropQuery.cpp b/src/Interpreters/InterpreterDropQuery.cpp index 9e63c647f71..33e93a79c41 100644 --- a/src/Interpreters/InterpreterDropQuery.cpp +++ b/src/Interpreters/InterpreterDropQuery.cpp @@ -146,7 +146,7 @@ BlockIO InterpreterDropQuery::executeToTableImpl(const ASTDropQuery & query, Dat ddl_guard->releaseTableLock(); table.reset(); - return typeid_cast(database.get())->propose(query.clone(), context); + return typeid_cast(database.get())->tryEnqueueReplicatedDDL(query.clone(), context); } if (query.kind == ASTDropQuery::Kind::Detach) @@ -231,7 +231,7 @@ BlockIO InterpreterDropQuery::executeToDictionary( context.checkAccess(AccessType::DROP_DICTIONARY, database_name, dictionary_name); ddl_guard->releaseTableLock(); - return typeid_cast(database.get())->propose(query_ptr, context); + return typeid_cast(database.get())->tryEnqueueReplicatedDDL(query_ptr, context); } if (!database || !database->isDictionaryExist(dictionary_name)) diff --git a/src/Interpreters/InterpreterRenameQuery.cpp b/src/Interpreters/InterpreterRenameQuery.cpp index b9d7faac73c..923a342d9ea 100644 --- a/src/Interpreters/InterpreterRenameQuery.cpp +++ b/src/Interpreters/InterpreterRenameQuery.cpp @@ -90,7 +90,7 @@ BlockIO InterpreterRenameQuery::executeToTables(const ASTRenameQuery & rename, c UniqueTableName to(elem.to_database_name, elem.to_table_name); ddl_guards[from]->releaseTableLock(); ddl_guards[to]->releaseTableLock(); - return typeid_cast(database.get())->propose(query_ptr, context); + return typeid_cast(database.get())->tryEnqueueReplicatedDDL(query_ptr, context); } else { diff --git a/src/Storages/StorageMaterializedView.cpp b/src/Storages/StorageMaterializedView.cpp index 32317968fe5..325bf3d2f74 100644 --- a/src/Storages/StorageMaterializedView.cpp +++ b/src/Storages/StorageMaterializedView.cpp @@ -212,11 +212,11 @@ static void executeDropQuery(ASTDropQuery::Kind kind, const Context & global_con /// looks like expected behaviour and we have tests for it. auto drop_context = Context(global_context); drop_context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; - if (auto txn = current_context.getMetadataTransaction()) + if (auto txn = current_context.getZooKeeperMetadataTransaction()) { /// For Replicated database drop_context.setQueryContext(const_cast(current_context)); - drop_context.initMetadataTransaction(txn, true); + drop_context.initZooKeeperMetadataTransaction(txn, true); } InterpreterDropQuery drop_interpreter(ast_drop_query, drop_context); drop_interpreter.execute(); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index ff39bf91fbb..f2c88cdedd9 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -4282,12 +4282,12 @@ void StorageReplicatedMergeTree::alter( zkutil::makeCreateRequest(mutations_path + "/", mutation_entry.toString(), zkutil::CreateMode::PersistentSequential)); } - if (auto txn = query_context.getMetadataTransaction()) + if (auto txn = query_context.getZooKeeperMetadataTransaction()) { txn->moveOpsTo(ops); /// NOTE: IDatabase::alterTable(...) is called when executing ALTER_METADATA queue entry without query context, /// so we have to update metadata of DatabaseReplicated here. - String metadata_zk_path = txn->zookeeper_path + "/metadata/" + escapeForFileName(table_id.table_name); + String metadata_zk_path = txn->getDatabaseZooKeeperPath() + "/metadata/" + escapeForFileName(table_id.table_name); auto ast = DatabaseCatalog::instance().getDatabase(table_id.database_name)->getCreateTableQuery(table_id.table_name, query_context); applyMetadataChangesToCreateQuery(ast, future_metadata); ops.emplace_back(zkutil::makeSetRequest(metadata_zk_path, getObjectDefinitionFromCreateQuery(ast), -1)); @@ -5262,7 +5262,7 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const requests.emplace_back(zkutil::makeCreateRequest( mutations_path + "/", mutation_entry.toString(), zkutil::CreateMode::PersistentSequential)); - if (auto txn = query_context.getMetadataTransaction()) + if (auto txn = query_context.getZooKeeperMetadataTransaction()) txn->moveOpsTo(requests); Coordination::Responses responses; @@ -5766,7 +5766,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom( } } - if (auto txn = context.getMetadataTransaction()) + if (auto txn = context.getZooKeeperMetadataTransaction()) txn->moveOpsTo(ops); ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/log", "", -1)); /// Just update version @@ -6269,7 +6269,7 @@ bool StorageReplicatedMergeTree::dropAllPartsInPartition( Coordination::Requests ops; ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential)); ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/log", "", -1)); /// Just update version. - if (auto txn = query_context.getMetadataTransaction()) + if (auto txn = query_context.getZooKeeperMetadataTransaction()) txn->moveOpsTo(ops); Coordination::Responses responses = zookeeper.multi(ops); diff --git a/tests/queries/skip_list.json b/tests/queries/skip_list.json index f08a41e32b8..e6bb3747fb0 100644 --- a/tests/queries/skip_list.json +++ b/tests/queries/skip_list.json @@ -108,6 +108,7 @@ "memory_tracking", "memory_usage", "live_view", + "00825_protobuf_format_map", "00152_insert_different_granularity", "01715_background_checker_blather_zookeeper", "01714_alter_drop_version",