diff --git a/src/Common/ZooKeeper/TestKeeper.cpp b/src/Common/ZooKeeper/TestKeeper.cpp index 5f34a60c34e..2d89228c7ae 100644 --- a/src/Common/ZooKeeper/TestKeeper.cpp +++ b/src/Common/ZooKeeper/TestKeeper.cpp @@ -213,10 +213,11 @@ std::pair TestKeeperCreateRequest::process(TestKeeper::Contai created_node.is_sequental = is_sequential; std::string path_created = path; + ++it->second.seq_num; + if (is_sequential) { auto seq_num = it->second.seq_num; - ++it->second.seq_num; std::stringstream seq_num_str; // STYLE_CHECK_ALLOW_STD_STRING_STREAM seq_num_str.exceptions(std::ios::failbit); @@ -228,15 +229,14 @@ std::pair TestKeeperCreateRequest::process(TestKeeper::Contai response.path_created = path_created; container.emplace(path_created, std::move(created_node)); - undo = [&container, path_created, is_sequential = is_sequential, parent_path = it->first] + undo = [&container, path_created, parent_path = it->first] { container.erase(path_created); auto & undo_parent = container.at(parent_path); --undo_parent.stat.cversion; --undo_parent.stat.numChildren; - if (is_sequential) - --undo_parent.seq_num; + --undo_parent.seq_num; }; ++it->second.stat.cversion; diff --git a/src/Databases/DatabaseReplicated.cpp b/src/Databases/DatabaseReplicated.cpp index 418eaf567a4..a7e6c11ca4c 100644 --- a/src/Databases/DatabaseReplicated.cpp +++ b/src/Databases/DatabaseReplicated.cpp @@ -170,6 +170,7 @@ void DatabaseReplicated::loadStoredObjects(Context & context, bool has_force_res DatabaseAtomic::loadStoredObjects(context, has_force_restore_data_flag, force_attach); ddl_worker = std::make_unique(this, global_context); + ddl_worker->startup(); } void DatabaseReplicated::onUnexpectedLogEntry(const String & entry_name, const ZooKeeperPtr & zookeeper) diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 27deb07d296..ef19c134854 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -1487,6 +1487,7 @@ void Context::setDDLWorker(std::unique_ptr ddl_worker) auto lock = getLock(); if (shared->ddl_worker) throw Exception("DDL background thread has already been initialized", ErrorCodes::LOGICAL_ERROR); + ddl_worker->startup(); shared->ddl_worker = std::move(ddl_worker); } diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index 12f4c42b467..188d38b8647 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -167,7 +167,10 @@ DDLWorker::DDLWorker(int pool_size_, const std::string & zk_root_dir, const Cont host_fqdn = getFQDNOrHostName(); host_fqdn_id = Cluster::Address::toString(host_fqdn, context.getTCPPort()); +} +void DDLWorker::startup() +{ main_thread = ThreadFromGlobalPool(&DDLWorker::runMainThread, this); cleanup_thread = ThreadFromGlobalPool(&DDLWorker::runCleanupThread, this); } @@ -183,8 +186,10 @@ DDLWorker::~DDLWorker() { shutdown(); worker_pool.wait(); - main_thread.join(); - cleanup_thread.join(); + if (main_thread.joinable()) + main_thread.join(); + if (cleanup_thread.joinable()) + cleanup_thread.join(); } @@ -421,7 +426,12 @@ void DDLWorker::enqueueTask(DDLTaskPtr task_ptr) else if (e.code == Coordination::Error::ZNONODE) { LOG_ERROR(log, "ZooKeeper error: {}", getCurrentExceptionMessage(true)); - // TODO: retry? + if (!current_zookeeper->exists(task_ptr->entry_path)) + { + //FIXME race condition with cleanup thread + LOG_ERROR(log, "Task {} is lost. It probably was removed by other server.", task_ptr->entry_path); + return; + } } else { diff --git a/src/Interpreters/DDLWorker.h b/src/Interpreters/DDLWorker.h index 02076ae1df1..f41ca0fce8f 100644 --- a/src/Interpreters/DDLWorker.h +++ b/src/Interpreters/DDLWorker.h @@ -51,6 +51,7 @@ public: return host_fqdn_id; } + void startup(); void shutdown(); bool isCurrentlyActive() const { return initialized && !stop_flag; } diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp index 0b7fb3e5431..f201e38be2e 100644 --- a/src/Interpreters/InterpreterCreateQuery.cpp +++ b/src/Interpreters/InterpreterCreateQuery.cpp @@ -136,7 +136,10 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create) engine->name = "Atomic"; if (old_style_database) { - engine = makeASTFunction("Replicated", + if (database_name == "test") + engine->name = "Ordinary"; // for stateful tests + else + engine = makeASTFunction("Replicated", std::make_shared(fmt::format("/clickhouse/db/{}/", create.database)), std::make_shared("s1"), std::make_shared("r1")); diff --git a/src/Interpreters/executeDDLQueryOnCluster.cpp b/src/Interpreters/executeDDLQueryOnCluster.cpp index 0b44206a2b2..2ca07349cbc 100644 --- a/src/Interpreters/executeDDLQueryOnCluster.cpp +++ b/src/Interpreters/executeDDLQueryOnCluster.cpp @@ -201,6 +201,10 @@ DDLQueryStatusInputStream::DDLQueryStatusInputStream(const String & zk_node_path addTotalRowsApprox(waiting_hosts.size()); timeout_seconds = context.getSettingsRef().distributed_ddl_task_timeout; + + //FIXME revert it before merge + if (context.getSettingsRef().default_database_engine.value == DefaultDatabaseEngine::Ordinary) + timeout_seconds = 10; } Block DDLQueryStatusInputStream::readImpl()