From 18f6b5bbad353431e5f7494103756264b0f2ca79 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Thu, 4 Feb 2021 22:41:44 +0300 Subject: [PATCH] add timeouts --- src/Databases/DatabaseReplicated.cpp | 40 +++++---- src/Databases/DatabaseReplicated.h | 3 +- src/Databases/DatabaseReplicatedWorker.cpp | 90 ++++++++++++++++----- src/Databases/DatabaseReplicatedWorker.h | 2 +- src/Interpreters/DDLTask.cpp | 4 - src/Interpreters/DDLTask.h | 2 +- src/Interpreters/DDLWorker.cpp | 18 ++--- src/Interpreters/DDLWorker.h | 2 +- src/Interpreters/DatabaseCatalog.cpp | 2 +- src/Interpreters/DatabaseCatalog.h | 4 +- src/Interpreters/InterpreterAlterQuery.cpp | 2 +- src/Interpreters/InterpreterCreateQuery.cpp | 2 +- src/Interpreters/InterpreterDropQuery.cpp | 4 +- src/Interpreters/InterpreterRenameQuery.cpp | 2 +- tests/queries/skip_list.json | 21 +++++ 15 files changed, 139 insertions(+), 59 deletions(-) diff --git a/src/Databases/DatabaseReplicated.cpp b/src/Databases/DatabaseReplicated.cpp index 44746cd5716..5a11787331c 100644 --- a/src/Databases/DatabaseReplicated.cpp +++ b/src/Databases/DatabaseReplicated.cpp @@ -35,6 +35,7 @@ namespace ErrorCodes extern const int DATABASE_REPLICATION_FAILED; extern const int UNKNOWN_DATABASE; extern const int NOT_IMPLEMENTED; + extern const int INCORRECT_QUERY; } zkutil::ZooKeeperPtr DatabaseReplicated::getZooKeeper() const @@ -121,8 +122,8 @@ bool DatabaseReplicated::createDatabaseNodesInZooKeeper(const zkutil::ZooKeeperP ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/counter/cnt-", "", zkutil::CreateMode::Persistent)); ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/counter/cnt-", -1)); ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/metadata", "", zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/min_log_ptr", "1", zkutil::CreateMode::Persistent)); ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/max_log_ptr", "1", zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/logs_to_keep", "1000", zkutil::CreateMode::Persistent)); Coordination::Responses responses; auto res = current_zookeeper->tryMulti(ops, responses); @@ -194,7 +195,7 @@ void DatabaseReplicated::onUnexpectedLogEntry(const String & entry_name, const Z throw Exception(ErrorCodes::LOGICAL_ERROR, "Entry {} already executed, current pointer is {}", entry_number, log_entry_to_execute); /// Entry name is valid. Let's get min log pointer to check if replica is staled. - UInt32 min_snapshot = parse(zookeeper->get(zookeeper_path + "/min_log_ptr")); + UInt32 min_snapshot = parse(zookeeper->get(zookeeper_path + "/min_log_ptr")); // FIXME if (log_entry_to_execute < min_snapshot) { @@ -207,13 +208,15 @@ void DatabaseReplicated::onUnexpectedLogEntry(const String & entry_name, const Z } -BlockIO DatabaseReplicated::propose(const ASTPtr & query) +BlockIO DatabaseReplicated::propose(const ASTPtr & query, const Context & query_context) { + if (query_context.getClientInfo().query_kind != ClientInfo::QueryKind::INITIAL_QUERY) + throw Exception(ErrorCodes::INCORRECT_QUERY, "It's not initial query. ON CLUSTER is not allowed for Replicated database."); + if (const auto * query_alter = query->as()) { for (const auto & command : query_alter->command_list->children) { - //FIXME allow all types of queries (maybe we should execute ATTACH an similar queries on leader) if (!isSupportedAlterType(command->as().type)) throw Exception("Unsupported type of ALTER query", ErrorCodes::NOT_IMPLEMENTED); } @@ -225,17 +228,16 @@ BlockIO DatabaseReplicated::propose(const ASTPtr & query) DDLLogEntry entry; entry.query = queryToString(query); entry.initiator = ddl_worker->getCommonHostID(); - String node_path = ddl_worker->tryEnqueueAndExecuteEntry(entry); + String node_path = ddl_worker->tryEnqueueAndExecuteEntry(entry, query_context); BlockIO io; - //FIXME use query context - if (global_context.getSettingsRef().distributed_ddl_task_timeout == 0) + if (query_context.getSettingsRef().distributed_ddl_task_timeout == 0) return io; //FIXME need list of all replicas, we can obtain it from zk Strings hosts_to_wait; hosts_to_wait.emplace_back(getFullReplicaName()); - auto stream = std::make_shared(node_path, entry, global_context, hosts_to_wait); + auto stream = std::make_shared(node_path, entry, query_context, hosts_to_wait); io.in = std::move(stream); return io; } @@ -295,17 +297,20 @@ void DatabaseReplicated::drop(const Context & context_) { auto current_zookeeper = getZooKeeper(); current_zookeeper->set(replica_path, "DROPPED"); - current_zookeeper->tryRemoveRecursive(replica_path); DatabaseAtomic::drop(context_); + current_zookeeper->tryRemoveRecursive(replica_path); +} + +void DatabaseReplicated::stopReplication() +{ + if (ddl_worker) + ddl_worker->shutdown(); } void DatabaseReplicated::shutdown() { - if (ddl_worker) - { - ddl_worker->shutdown(); - ddl_worker = nullptr; - } + stopReplication(); + ddl_worker = nullptr; DatabaseAtomic::shutdown(); } @@ -330,10 +335,15 @@ void DatabaseReplicated::renameTable(const Context & context, const String & tab if (txn->is_initial_query) { + if (!isTableExist(table_name, context)) + throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {} does not exist", table_name); + if (exchange && !to_database.isTableExist(to_table_name, context)) + throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {} does not exist", to_table_name); + String statement; String statement_to; { - //FIXME It's not atomic (however we have only one thread) + /// NOTE It's not atomic (however, we have only one thread) ReadBufferFromFile in(getObjectMetadataPath(table_name), 4096); readStringUntilEOF(statement, in); if (exchange) diff --git a/src/Databases/DatabaseReplicated.h b/src/Databases/DatabaseReplicated.h index 586f381c962..a866a61558c 100644 --- a/src/Databases/DatabaseReplicated.h +++ b/src/Databases/DatabaseReplicated.h @@ -60,8 +60,9 @@ public: String getEngineName() const override { return "Replicated"; } - BlockIO propose(const ASTPtr & query); + BlockIO propose(const ASTPtr & query, const Context & query_context); + void stopReplication(); void shutdown() override; void loadStoredObjects(Context & context, bool has_force_restore_data_flag, bool force_attach) override; diff --git a/src/Databases/DatabaseReplicatedWorker.cpp b/src/Databases/DatabaseReplicatedWorker.cpp index 5af216c3d0d..1c000a8f0a7 100644 --- a/src/Databases/DatabaseReplicatedWorker.cpp +++ b/src/Databases/DatabaseReplicatedWorker.cpp @@ -9,6 +9,8 @@ namespace ErrorCodes { extern const int LOGICAL_ERROR; extern const int DATABASE_REPLICATION_FAILED; + extern const int NOT_A_LEADER; + extern const int UNFINISHED; } DatabaseReplicatedDDLWorker::DatabaseReplicatedDDLWorker(DatabaseReplicated * db, const Context & context_) @@ -22,7 +24,7 @@ DatabaseReplicatedDDLWorker::DatabaseReplicatedDDLWorker(DatabaseReplicated * db void DatabaseReplicatedDDLWorker::initializeMainThread() { - do + while (!initialized && !stop_flag) { try { @@ -36,17 +38,17 @@ void DatabaseReplicatedDDLWorker::initializeMainThread() sleepForSeconds(5); } } - while (!initialized && !stop_flag); } void DatabaseReplicatedDDLWorker::initializeReplication() { /// Check if we need to recover replica. - /// Invariant: replica is lost if it's log_ptr value is less then min_log_ptr value. + /// Invariant: replica is lost if it's log_ptr value is less then max_log_ptr - logs_to_keep. UInt32 our_log_ptr = parse(current_zookeeper->get(database->replica_path + "/log_ptr")); - UInt32 min_log_ptr = parse(current_zookeeper->get(database->zookeeper_path + "/min_log_ptr")); - if (our_log_ptr < min_log_ptr) + UInt32 max_log_ptr = parse(current_zookeeper->get(database->zookeeper_path + "/max_log_ptr")); + UInt32 logs_to_keep = parse(current_zookeeper->get(database->zookeeper_path + "/logs_to_keep")); + if (our_log_ptr + logs_to_keep < max_log_ptr) database->recoverLostReplica(current_zookeeper, 0); } @@ -75,10 +77,19 @@ String DatabaseReplicatedDDLWorker::enqueueQuery(DDLLogEntry & entry) return node_path; } -String DatabaseReplicatedDDLWorker::tryEnqueueAndExecuteEntry(DDLLogEntry & entry) +String DatabaseReplicatedDDLWorker::tryEnqueueAndExecuteEntry(DDLLogEntry & entry, const Context & query_context) { + /// NOTE Possibly it would be better to execute initial query on the most up-to-date node, + /// but it requires more complex logic around /try node. + auto zookeeper = getAndSetZooKeeper(); - // TODO do not enqueue query if we have big replication lag + UInt32 our_log_ptr = parse(zookeeper->get(database->replica_path + "/log_ptr")); + UInt32 max_log_ptr = parse(zookeeper->get(database->zookeeper_path + "/max_log_ptr")); + assert(our_log_ptr <= max_log_ptr); + constexpr UInt32 max_replication_lag = 16; + if (max_replication_lag < max_log_ptr - our_log_ptr) + throw Exception(ErrorCodes::NOT_A_LEADER, "Cannot enqueue query on this replica, " + "because it has replication lag of {} queries. Try other replica.", max_log_ptr - our_log_ptr); String entry_path = enqueueQuery(entry); auto try_node = zkutil::EphemeralNodeHolder::existing(entry_path + "/try", *zookeeper); @@ -91,9 +102,18 @@ String DatabaseReplicatedDDLWorker::tryEnqueueAndExecuteEntry(DDLLogEntry & entr task->is_initial_query = true; LOG_DEBUG(log, "Waiting for worker thread to process all entries before {}", entry_name); + UInt64 timeout = query_context.getSettingsRef().distributed_ddl_task_timeout; { std::unique_lock lock{mutex}; - wait_current_task_change.wait(lock, [&]() { assert(zookeeper->expired() || current_task <= entry_name); return zookeeper->expired() || current_task == entry_name; }); + bool processed = wait_current_task_change.wait_for(lock, std::chrono::seconds(timeout), [&]() + { + assert(zookeeper->expired() || current_task <= entry_name); + return zookeeper->expired() || current_task == entry_name || stop_flag; + }); + + if (!processed) + throw Exception(ErrorCodes::UNFINISHED, "Timeout: Cannot enqueue query on this replica," + "most likely because replica is busy with previous queue entries"); } if (zookeeper->expired()) @@ -116,8 +136,11 @@ DDLTaskPtr DatabaseReplicatedDDLWorker::initAndCheckTask(const String & entry_na { { std::lock_guard lock{mutex}; - current_task = entry_name; - wait_current_task_change.notify_all(); + if (current_task < entry_name) + { + current_task = entry_name; + wait_current_task_change.notify_all(); + } } UInt32 our_log_ptr = parse(current_zookeeper->get(database->replica_path + "/log_ptr")); @@ -135,18 +158,50 @@ DDLTaskPtr DatabaseReplicatedDDLWorker::initAndCheckTask(const String & entry_na String initiator_name; zkutil::EventPtr wait_committed_or_failed = std::make_shared(); - if (zookeeper->tryGet(entry_path + "/try", initiator_name, nullptr, wait_committed_or_failed)) + String try_node_path = entry_path + "/try"; + if (zookeeper->tryGet(try_node_path, initiator_name, nullptr, wait_committed_or_failed)) { task->is_initial_query = initiator_name == task->host_id_str; + /// Query is not committed yet. We cannot just skip it and execute next one, because reordering may break replication. - //FIXME add some timeouts LOG_TRACE(log, "Waiting for initiator {} to commit or rollback entry {}", initiator_name, entry_path); - wait_committed_or_failed->wait(); + constexpr size_t wait_time_ms = 1000; + constexpr size_t max_iterations = 3600; + size_t iteration = 0; + + while (!wait_committed_or_failed->tryWait(wait_time_ms)) + { + if (stop_flag) + { + /// We cannot return task to process and we cannot return nullptr too, + /// because nullptr means "task should not be executed". + /// We can only exit by exception. + throw Exception(ErrorCodes::UNFINISHED, "Replication was stopped"); + } + + if (max_iterations <= ++iteration) + { + /// What can we do if initiator hangs for some reason? Seems like we can remove /try node. + /// Initiator will fail to commit entry to ZK (including ops for replicated table) if /try does not exist. + /// But it's questionable. + + /// We use tryRemove(...) because multiple hosts (including initiator) may try to do it concurrently. + auto code = zookeeper->tryRemove(try_node_path); + if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE) + throw Coordination::Exception(code, try_node_path); + + if (!zookeeper->exists(entry_path + "/committed")) + { + out_reason = fmt::format("Entry {} was forcefully cancelled due to timeout", entry_name); + return {}; + } + } + } } if (!zookeeper->exists(entry_path + "/committed")) { - out_reason = "Entry " + entry_name + " hasn't been committed"; + out_reason = fmt::format("Entry {} hasn't been committed", entry_name); return {}; } @@ -154,7 +209,7 @@ DDLTaskPtr DatabaseReplicatedDDLWorker::initAndCheckTask(const String & entry_na { assert(!zookeeper->exists(entry_path + "/try")); assert(zookeeper->exists(entry_path + "/committed") == (zookeeper->get(task->getFinishedNodePath()) == "0")); - out_reason = "Entry " + entry_name + " has been executed as initial query"; + out_reason = fmt::format("Entry {} has been executed as initial query", entry_name); return {}; } @@ -169,8 +224,7 @@ DDLTaskPtr DatabaseReplicatedDDLWorker::initAndCheckTask(const String & entry_na if (task->entry.query.empty()) { - //TODO better way to determine special entries - out_reason = "It's dummy task"; + out_reason = fmt::format("Entry {} is a dummy task", entry_name); return {}; } @@ -178,7 +232,7 @@ DDLTaskPtr DatabaseReplicatedDDLWorker::initAndCheckTask(const String & entry_na if (zookeeper->exists(task->getFinishedNodePath())) { - out_reason = "Task has been already processed"; + out_reason = fmt::format("Task {} has been already processed", entry_name); return {}; } diff --git a/src/Databases/DatabaseReplicatedWorker.h b/src/Databases/DatabaseReplicatedWorker.h index 6e29e48469b..e3fd58c4305 100644 --- a/src/Databases/DatabaseReplicatedWorker.h +++ b/src/Databases/DatabaseReplicatedWorker.h @@ -13,7 +13,7 @@ public: String enqueueQuery(DDLLogEntry & entry) override; - String tryEnqueueAndExecuteEntry(DDLLogEntry & entry); + String tryEnqueueAndExecuteEntry(DDLLogEntry & entry, const Context & query_context); private: void initializeMainThread() override; diff --git a/src/Interpreters/DDLTask.cpp b/src/Interpreters/DDLTask.cpp index 55e613648ae..9737167fa4c 100644 --- a/src/Interpreters/DDLTask.cpp +++ b/src/Interpreters/DDLTask.cpp @@ -309,13 +309,9 @@ std::unique_ptr DatabaseReplicatedTask::makeQueryContext(Context & from { txn->ops.emplace_back(zkutil::makeRemoveRequest(entry_path + "/try", -1)); txn->ops.emplace_back(zkutil::makeCreateRequest(entry_path + "/committed", host_id_str, zkutil::CreateMode::Persistent)); - //txn->ops.emplace_back(zkutil::makeRemoveRequest(getActiveNodePath(), -1)); txn->ops.emplace_back(zkutil::makeSetRequest(database->zookeeper_path + "/max_log_ptr", toString(getLogEntryNumber(entry_name)), -1)); } - //if (execute_on_leader) - // txn->ops.emplace_back(zkutil::makeCreateRequest(getShardNodePath() + "/executed", host_id_str, zkutil::CreateMode::Persistent)); - //txn->ops.emplace_back(zkutil::makeCreateRequest(getFinishedNodePath(), execution_status.serializeText(), zkutil::CreateMode::Persistent)); txn->ops.emplace_back(zkutil::makeSetRequest(database->replica_path + "/log_ptr", toString(getLogEntryNumber(entry_name)), -1)); std::move(ops.begin(), ops.end(), std::back_inserter(txn->ops)); diff --git a/src/Interpreters/DDLTask.h b/src/Interpreters/DDLTask.h index 49f6d74a931..552f4919765 100644 --- a/src/Interpreters/DDLTask.h +++ b/src/Interpreters/DDLTask.h @@ -81,7 +81,6 @@ struct DDLTaskBase bool is_circular_replicated = false; bool execute_on_leader = false; - //MetadataTransactionPtr txn; Coordination::Requests ops; ExecutionStatus execution_status; bool was_executed = false; @@ -163,6 +162,7 @@ struct MetadataTransaction void commit(); + ~MetadataTransaction() { assert(state != CREATED || std::uncaught_exception()); } }; } diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index 545e00296e8..da2e878541d 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -341,7 +341,8 @@ void DDLWorker::scheduleTasks() { /// We will recheck status of last executed tasks. It's useful if main thread was just restarted. auto & min_task = *std::min_element(current_tasks.begin(), current_tasks.end()); - begin_node = std::upper_bound(queue_nodes.begin(), queue_nodes.end(), min_task->entry_name); + String min_entry_name = last_skipped_entry_name ? std::min(min_task->entry_name, *last_skipped_entry_name) : min_task->entry_name; + begin_node = std::upper_bound(queue_nodes.begin(), queue_nodes.end(), min_entry_name); current_tasks.clear(); } @@ -358,6 +359,7 @@ void DDLWorker::scheduleTasks() { LOG_DEBUG(log, "Will not execute task {}: {}", entry_name, reason); updateMaxDDLEntryID(entry_name); + last_skipped_entry_name.emplace(entry_name); continue; } @@ -500,10 +502,7 @@ void DDLWorker::processTask(DDLTaskBase & task) { /// It's not CREATE DATABASE auto table_id = context.tryResolveStorageID(*query_with_table, Context::ResolveOrdinary); - DatabasePtr database; - std::tie(database, storage) = DatabaseCatalog::instance().tryGetDatabaseAndTable(table_id, context); - if (database && database->getEngineName() == "Replicated" && !typeid_cast(&task)) - throw Exception(ErrorCodes::INCORRECT_QUERY, "ON CLUSTER queries are not allowed for Replicated databases"); + storage = DatabaseCatalog::instance().tryGetTable(table_id, context); } task.execute_on_leader = storage && taskShouldBeExecutedOnLeader(task.query, storage) && !task.is_circular_replicated; @@ -553,7 +552,8 @@ void DDLWorker::processTask(DDLTaskBase & task) updateMaxDDLEntryID(task.entry_name); /// FIXME: if server fails right here, the task will be executed twice. We need WAL here. - /// If ZooKeeper connection is lost here, we will try again to write query status. + /// NOTE: If ZooKeeper connection is lost here, we will try again to write query status. + /// NOTE: If both table and database are replicated, task is executed in single ZK transaction. bool status_written = task.ops.empty(); if (!status_written) @@ -959,12 +959,6 @@ void DDLWorker::runMainThread() initialized = false; LOG_INFO(log, "Lost ZooKeeper connection, will try to connect again: {}", getCurrentExceptionMessage(true)); } - else if (e.code == Coordination::Error::ZNONODE) - { - // TODO add comment: when it happens and why it's expected? - // maybe because cleanup thread may remove nodes inside queue entry which are currently processed - LOG_ERROR(log, "ZooKeeper error: {}", getCurrentExceptionMessage(true)); - } else { LOG_ERROR(log, "Unexpected ZooKeeper error, will try to restart main thread: {}", getCurrentExceptionMessage(true)); diff --git a/src/Interpreters/DDLWorker.h b/src/Interpreters/DDLWorker.h index d9fd4e58cb6..706face3885 100644 --- a/src/Interpreters/DDLWorker.h +++ b/src/Interpreters/DDLWorker.h @@ -115,7 +115,7 @@ protected: ZooKeeperPtr current_zookeeper; /// Save state of executed task to avoid duplicate execution on ZK error - //std::optional last_entry_name; + std::optional last_skipped_entry_name; std::list current_tasks; std::shared_ptr queue_updated_event = std::make_shared(); diff --git a/src/Interpreters/DatabaseCatalog.cpp b/src/Interpreters/DatabaseCatalog.cpp index 6313da7132d..f27fb93b2d4 100644 --- a/src/Interpreters/DatabaseCatalog.cpp +++ b/src/Interpreters/DatabaseCatalog.cpp @@ -609,7 +609,7 @@ DatabaseCatalog::updateDependency(const StorageID & old_from, const StorageID & view_dependencies[{new_from.getDatabaseName(), new_from.getTableName()}].insert(new_where); } -std::unique_ptr DatabaseCatalog::getDDLGuard(const String & database, const String & table) +DDLGuardPtr DatabaseCatalog::getDDLGuard(const String & database, const String & table) { std::unique_lock lock(ddl_guards_mutex); auto db_guard_iter = ddl_guards.try_emplace(database).first; diff --git a/src/Interpreters/DatabaseCatalog.h b/src/Interpreters/DatabaseCatalog.h index c9f031ef678..bb82dbfc440 100644 --- a/src/Interpreters/DatabaseCatalog.h +++ b/src/Interpreters/DatabaseCatalog.h @@ -67,6 +67,8 @@ private: bool is_database_guard = false; }; +using DDLGuardPtr = std::unique_ptr; + /// Creates temporary table in `_temporary_and_external_tables` with randomly generated unique StorageID. /// Such table can be accessed from everywhere by its ID. @@ -120,7 +122,7 @@ public: void loadDatabases(); /// Get an object that protects the table from concurrently executing multiple DDL operations. - std::unique_ptr getDDLGuard(const String & database, const String & table); + DDLGuardPtr getDDLGuard(const String & database, const String & table); /// Get an object that protects the database from concurrent DDL queries all tables in the database std::unique_lock getExclusiveDDLGuardForDatabase(const String & database); diff --git a/src/Interpreters/InterpreterAlterQuery.cpp b/src/Interpreters/InterpreterAlterQuery.cpp index 612f9833af5..cee9b9083ea 100644 --- a/src/Interpreters/InterpreterAlterQuery.cpp +++ b/src/Interpreters/InterpreterAlterQuery.cpp @@ -53,7 +53,7 @@ BlockIO InterpreterAlterQuery::execute() { auto guard = DatabaseCatalog::instance().getDDLGuard(table_id.database_name, table_id.table_name); guard->releaseTableLock(); - return typeid_cast(database.get())->propose(query_ptr); + return typeid_cast(database.get())->propose(query_ptr, context); } StoragePtr table = DatabaseCatalog::instance().getTable(table_id, context); diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp index 8d344545c8a..6af212172b2 100644 --- a/src/Interpreters/InterpreterCreateQuery.cpp +++ b/src/Interpreters/InterpreterCreateQuery.cpp @@ -886,7 +886,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create) { assertOrSetUUID(create, database); guard->releaseTableLock(); - return typeid_cast(database.get())->propose(query_ptr); + return typeid_cast(database.get())->propose(query_ptr, context); } } diff --git a/src/Interpreters/InterpreterDropQuery.cpp b/src/Interpreters/InterpreterDropQuery.cpp index db2f463893e..b22d46358f9 100644 --- a/src/Interpreters/InterpreterDropQuery.cpp +++ b/src/Interpreters/InterpreterDropQuery.cpp @@ -139,7 +139,7 @@ BlockIO InterpreterDropQuery::executeToTableImpl(const ASTDropQuery & query, Dat ddl_guard->releaseTableLock(); table.reset(); - return typeid_cast(database.get())->propose(query.clone()); + return typeid_cast(database.get())->propose(query.clone(), context); } if (query.kind == ASTDropQuery::Kind::Detach) @@ -325,6 +325,8 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query, if (database->getEngineName() == "MaterializeMySQL") stopDatabaseSynchronization(database); #endif + if (auto * replicated = typeid_cast(database.get())) + replicated->stopReplication(); if (database->shouldBeEmptyOnDetach()) { diff --git a/src/Interpreters/InterpreterRenameQuery.cpp b/src/Interpreters/InterpreterRenameQuery.cpp index d2f79ba071c..5bfc144e014 100644 --- a/src/Interpreters/InterpreterRenameQuery.cpp +++ b/src/Interpreters/InterpreterRenameQuery.cpp @@ -90,7 +90,7 @@ BlockIO InterpreterRenameQuery::executeToTables(const ASTRenameQuery & rename, c UniqueTableName to(elem.to_database_name, elem.to_table_name); ddl_guards[from]->releaseTableLock(); ddl_guards[to]->releaseTableLock(); - return typeid_cast(database.get())->propose(query_ptr); + return typeid_cast(database.get())->propose(query_ptr, context); } else { diff --git a/tests/queries/skip_list.json b/tests/queries/skip_list.json index 273e00c8a23..adee777f900 100644 --- a/tests/queries/skip_list.json +++ b/tests/queries/skip_list.json @@ -103,6 +103,27 @@ "memory_tracking", /// FIXME remove it before merge "memory_tracking", "memory_usage", + "01533_multiple_nested", + "01575_disable_detach_table_of_dictionary", + "01457_create_as_table_function_structure", + "01415_inconsistent_merge_tree_settings", + "01413_allow_non_metadata_alters", + "01378_alter_rename_with_ttl_zookeeper", + "01349_mutation_datetime_key", + "01325_freeze_mutation_stuck", + "01272_suspicious_codecs", + "01181_db_atomic_drop_on_cluster", + "00957_delta_diff_bug", + "00910_zookeeper_custom_compression_codecs_replicated", + "00899_long_attach_memory_limit", + "00804_test_custom_compression_codes_log_storages", + "00804_test_alter_compression_codecs", + "00804_test_delta_codec_no_type_alter", + "00804_test_custom_compression_codecs", + "00753_alter_attach", + "00715_fetch_merged_or_mutated_part_zookeeper", + "00688_low_cardinality_serialization", + "01575_disable_detach_table_of_dictionary", "00738_lock_for_inner_table", "01666_blns", "01652_ignore_and_low_cardinality",