diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp index b238c48c002..8a7b9a245e4 100644 --- a/src/Interpreters/InterpreterCreateQuery.cpp +++ b/src/Interpreters/InterpreterCreateQuery.cpp @@ -690,6 +690,10 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create, /// Then background task is created by "startup" method. And when destructor of a table object is called, background task is still active, /// and the task will use references to freed data. + /// Also note that "startup" method is exception-safe. If exception is thrown from "startup", + /// we can safely destroy the object without a call to "shutdown", because there is guarantee + /// that no background threads/similar resources remain after exception from "startup". + res->startup(); return true; } diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index 77f2ba1c288..9b4fdcae23f 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -450,7 +450,6 @@ void StorageBuffer::startup() LOG_WARNING(log, "Storage {} is run with readonly settings, it will not be able to insert data. Set appropriate system_profile to fix this.", getName()); } - flush_handle = bg_pool.createTask(log->name() + "/Bg", [this]{ flushBack(); }); flush_handle->activateAndSchedule(); } diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 47b1c3d2837..7cd38e9081f 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -95,16 +95,36 @@ void StorageMergeTree::startup() /// NOTE background task will also do the above cleanups periodically. time_after_previous_cleanup.restart(); - auto & merge_pool = global_context.getBackgroundPool(); - merging_mutating_task_handle = merge_pool.createTask([this] { return mergeMutateTask(); }); - /// Ensure that thread started only after assignment to 'merging_mutating_task_handle' is done. - merge_pool.startTask(merging_mutating_task_handle); - - if (areBackgroundMovesNeeded()) + try { - auto & move_pool = global_context.getBackgroundMovePool(); - moving_task_handle = move_pool.createTask([this] { return movePartsTask(); }); - move_pool.startTask(moving_task_handle); + auto & merge_pool = global_context.getBackgroundPool(); + merging_mutating_task_handle = merge_pool.createTask([this] { return mergeMutateTask(); }); + /// Ensure that thread started only after assignment to 'merging_mutating_task_handle' is done. + merge_pool.startTask(merging_mutating_task_handle); + + if (areBackgroundMovesNeeded()) + { + auto & move_pool = global_context.getBackgroundMovePool(); + moving_task_handle = move_pool.createTask([this] { return movePartsTask(); }); + move_pool.startTask(moving_task_handle); + } + } + catch (...) + { + /// Exception safety: failed "startup" does not require a call to "shutdown" from the caller. + /// And it should be able to safely destroy table after exception in "startup" method. + /// It means that failed "startup" must not create any background tasks that we will have to wait. + try + { + shutdown(); + } + catch (...) + { + std::terminate(); + } + + /// Note: after failed "startup", the table will be in a state that only allows to destroy the object. + throw; } } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 95e36ef24ad..e65df7f7160 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -2907,44 +2907,60 @@ void StorageReplicatedMergeTree::startup() if (is_readonly) return; - queue.initialize( - zookeeper_path, replica_path, - getStorageID().getFullTableName() + " (ReplicatedMergeTreeQueue)", - getDataParts()); - - data_parts_exchange_endpoint = std::make_shared(*this); - global_context.getInterserverIOHandler().addEndpoint(data_parts_exchange_endpoint->getId(replica_path), data_parts_exchange_endpoint); - - /// In this thread replica will be activated. - restarting_thread.start(); - - /// Wait while restarting_thread initializes LeaderElection (and so on) or makes first attmept to do it - startup_event.wait(); - - /// If we don't separate create/start steps, race condition will happen - /// between the assignment of queue_task_handle and queueTask that use the queue_task_handle. + try { - auto lock = queue.lockQueue(); - auto & pool = global_context.getBackgroundPool(); - queue_task_handle = pool.createTask([this] { return queueTask(); }); - pool.startTask(queue_task_handle); - } + queue.initialize( + zookeeper_path, replica_path, + getStorageID().getFullTableName() + " (ReplicatedMergeTreeQueue)", + getDataParts()); - if (areBackgroundMovesNeeded()) - { - auto & pool = global_context.getBackgroundMovePool(); - move_parts_task_handle = pool.createTask([this] { return movePartsTask(); }); - pool.startTask(move_parts_task_handle); + data_parts_exchange_endpoint = std::make_shared(*this); + global_context.getInterserverIOHandler().addEndpoint(data_parts_exchange_endpoint->getId(replica_path), data_parts_exchange_endpoint); + + /// In this thread replica will be activated. + restarting_thread.start(); + + /// Wait while restarting_thread initializes LeaderElection (and so on) or makes first attmept to do it + startup_event.wait(); + + /// If we don't separate create/start steps, race condition will happen + /// between the assignment of queue_task_handle and queueTask that use the queue_task_handle. + { + auto lock = queue.lockQueue(); + auto & pool = global_context.getBackgroundPool(); + queue_task_handle = pool.createTask([this] { return queueTask(); }); + pool.startTask(queue_task_handle); + } + + if (areBackgroundMovesNeeded()) + { + auto & pool = global_context.getBackgroundMovePool(); + move_parts_task_handle = pool.createTask([this] { return movePartsTask(); }); + pool.startTask(move_parts_task_handle); + } + } + catch (...) + { + /// Exception safety: failed "startup" does not require a call to "shutdown" from the caller. + /// And it should be able to safely destroy table after exception in "startup" method. + /// It means that failed "startup" must not create any background tasks that we will have to wait. + try + { + shutdown(); + } + catch (...) + { + std::terminate(); + } + + /// Note: after failed "startup", the table will be in a state that only allows to destroy the object. + throw; } - need_shutdown.store(true); } void StorageReplicatedMergeTree::shutdown() { - if (!need_shutdown.load()) - return; - clearOldPartsFromFilesystem(true); /// Cancel fetches, merges and mutations to force the queue_task to finish ASAP. fetcher.blocker.cancelForever(); @@ -2981,7 +2997,6 @@ void StorageReplicatedMergeTree::shutdown() std::unique_lock lock(data_parts_exchange_endpoint->rwlock); } data_parts_exchange_endpoint.reset(); - need_shutdown.store(false); } diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index f01e51bd769..b82b387a623 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -288,8 +288,6 @@ private: /// True if replica was created for existing table with fixed granularity bool other_replicas_fixed_granularity = false; - std::atomic_bool need_shutdown{false}; - template void foreachCommittedParts(const Func & func) const;