diff --git a/dbms/src/Common/ActionBlocker.h b/dbms/src/Common/ActionBlocker.h new file mode 100644 index 00000000000..59f54ca2ee7 --- /dev/null +++ b/dbms/src/Common/ActionBlocker.h @@ -0,0 +1,59 @@ +#pragma once + +#include + +namespace DB +{ + +/// An atomic variable that is used to block and interrupt certain actions +/// If it is not zero then actions related with it should be considered as interrupted +struct ActionBlocker +{ + mutable std::atomic counter{0}; + + bool isCancelled() const { return counter > 0; } + + /// Temporarily blocks merges (while the returned object is alive) + struct BlockHolder; + BlockHolder cancel() const { return BlockHolder(this); } + + // Cancel the actions forever. + void cancelForever() const { ++counter; } + + /// Blocks related action while a BlockerHolder instance exists + struct BlockHolder + { + explicit BlockHolder(const ActionBlocker * var_ = nullptr) : var(var_) + { + if (var) + ++var->counter; + } + + BlockHolder(BlockHolder && other) noexcept + { + var = other.var; + other.var = nullptr; + } + + BlockHolder & operator=(BlockHolder && other) noexcept + { + var = other.var; + other.var = nullptr; + return *this; + } + + BlockHolder(const BlockHolder & other) = delete; + BlockHolder & operator=(const BlockHolder & other) = delete; + + ~BlockHolder() + { + if (var) + --var->counter; + } + + private: + const ActionBlocker * var = nullptr; + }; +}; + +} diff --git a/dbms/src/IO/copyData.cpp b/dbms/src/IO/copyData.cpp index ec47a335fd7..a1b34b7b774 100644 --- a/dbms/src/IO/copyData.cpp +++ b/dbms/src/IO/copyData.cpp @@ -10,7 +10,7 @@ namespace DB namespace { -void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t bytes, std::atomic * is_cancelled) +void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t bytes, std::atomic * is_cancelled) { /// If read to the end of the buffer, eof() either fills the buffer with new data and moves the cursor to the beginning, or returns false. while (bytes > 0 && !from.eof()) @@ -55,7 +55,7 @@ void copyData(ReadBuffer & from, WriteBuffer & to) copyDataImpl(from, to, false, std::numeric_limits::max(), nullptr); } -void copyData(ReadBuffer & from, WriteBuffer & to, std::atomic & is_cancelled) +void copyData(ReadBuffer & from, WriteBuffer & to, std::atomic & is_cancelled) { copyDataImpl(from, to, false, std::numeric_limits::max(), &is_cancelled); } @@ -70,7 +70,7 @@ void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes) copyDataImpl(from, to, true, bytes, nullptr); } -void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::atomic & is_cancelled) +void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::atomic & is_cancelled) { copyDataImpl(from, to, true, bytes, &is_cancelled); } diff --git a/dbms/src/IO/copyData.h b/dbms/src/IO/copyData.h index ea5ea22b19c..890965d8523 100644 --- a/dbms/src/IO/copyData.h +++ b/dbms/src/IO/copyData.h @@ -21,8 +21,8 @@ void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes); /** The same, with the condition to cancel. */ -void copyData(ReadBuffer & from, WriteBuffer & to, std::atomic & is_cancelled); -void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::atomic & is_cancelled); +void copyData(ReadBuffer & from, WriteBuffer & to, std::atomic & is_cancelled); +void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::atomic & is_cancelled); void copyData(ReadBuffer & from, WriteBuffer & to, std::function cancellation_hook); void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::function cancellation_hook); diff --git a/dbms/src/Interpreters/InterserverIOHandler.h b/dbms/src/Interpreters/InterserverIOHandler.h index d899ded4de7..e790e311561 100644 --- a/dbms/src/Interpreters/InterserverIOHandler.h +++ b/dbms/src/Interpreters/InterserverIOHandler.h @@ -6,9 +6,11 @@ #include #include #include +#include #include #include #include +#include #include namespace Poco { namespace Net { class HTTPServerResponse; } } @@ -67,11 +69,8 @@ public: virtual void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) = 0; virtual ~InterserverIOEndpoint() {} - void cancel() { is_cancelled = true; } - -protected: - /// You need to stop the data transfer. - std::atomic is_cancelled {false}; + /// You need to stop the data transfer if blocker is activated. + ActionBlocker blocker; }; using InterserverIOEndpointPtr = std::shared_ptr; @@ -88,7 +87,7 @@ public: std::lock_guard lock(mutex); if (endpoint_map.count(name)) throw Exception("Duplicate interserver IO endpoint: " + name, ErrorCodes::DUPLICATE_INTERSERVER_IO_ENDPOINT); - endpoint_map[name] = endpoint; + endpoint_map[name] = std::move(endpoint); } void removeEndpoint(const String & name) @@ -119,7 +118,7 @@ class InterserverIOEndpointHolder { public: InterserverIOEndpointHolder(const String & name_, InterserverIOEndpointPtr endpoint_, InterserverIOHandler & handler_) - : name(name_), endpoint(endpoint_), handler(handler_) + : name(name_), endpoint(std::move(endpoint_)), handler(handler_) { handler.addEndpoint(name, endpoint); } @@ -143,7 +142,9 @@ public: } } - void cancel() { endpoint->cancel(); } + ActionBlocker & getBlocker() { return endpoint->blocker; } + void cancelForever() { getBlocker().cancelForever(); } + ActionBlocker::BlockHolder cancel() { return getBlocker().cancel(); } private: String name; diff --git a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp index 0793d624e20..20c9121e2af 100644 --- a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp @@ -47,7 +47,7 @@ std::string Service::getId(const std::string & node_id) const void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) { - if (is_cancelled) + if (blocker.isCancelled()) throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED); String part_name = params.get("part"); @@ -120,9 +120,9 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body ReadBufferFromFile file_in(path); HashingWriteBuffer hashing_out(out); - copyData(file_in, hashing_out, is_cancelled); + copyData(file_in, hashing_out, blocker.counter); - if (is_cancelled) + if (blocker.isCancelled()) throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED); if (hashing_out.count() != size) @@ -181,14 +181,6 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( return fetchPartImpl(part_name, replica_path, host, port, "", to_detached); } -MergeTreeData::MutableDataPartPtr Fetcher::fetchShardedPart( - const InterserverIOEndpointLocation & location, - const String & part_name, - size_t shard_no) -{ - return fetchPartImpl(part_name, location.name, location.host, location.port, toString(shard_no), true); -} - MergeTreeData::MutableDataPartPtr Fetcher::fetchPartImpl( const String & part_name, const String & replica_path, @@ -241,9 +233,9 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPartImpl( WriteBufferFromFile file_out(absolute_part_path + file_name); HashingWriteBuffer hashing_out(file_out); - copyData(in, hashing_out, file_size, is_cancelled); + copyData(in, hashing_out, file_size, blocker.counter); - if (is_cancelled) + if (blocker.isCancelled()) { /// NOTE The is_cancelled flag also makes sense to check every time you read over the network, performing a poll with a not very large timeout. /// And now we check it only between read chunks (in the `copyData` function). diff --git a/dbms/src/Storages/MergeTree/DataPartsExchange.h b/dbms/src/Storages/MergeTree/DataPartsExchange.h index 1bb1d906c38..1a6ab687893 100644 --- a/dbms/src/Storages/MergeTree/DataPartsExchange.h +++ b/dbms/src/Storages/MergeTree/DataPartsExchange.h @@ -54,14 +54,8 @@ public: int port, bool to_detached = false); - /// Method for resharding. Downloads a sharded part - /// from the specified shard to the `to_detached` folder. - MergeTreeData::MutableDataPartPtr fetchShardedPart( - const InterserverIOEndpointLocation & location, - const String & part_name, - size_t shard_no); - - void cancel() { is_cancelled = true; } + /// You need to stop the data transfer. + ActionBlocker blocker; private: MergeTreeData::MutableDataPartPtr fetchPartImpl( @@ -74,8 +68,6 @@ private: private: MergeTreeData & data; - /// You need to stop the data transfer. - std::atomic is_cancelled {false}; Logger * log; }; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index 95787287ea3..f4d46d658f2 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -479,7 +479,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart { static const String TMP_PREFIX = "tmp_merge_"; - if (isCancelled()) + if (merges_blocker.isCancelled()) throw Exception("Cancelled merging parts", ErrorCodes::ABORTED); const MergeTreeData::DataPartsVector & parts = future_part.parts; @@ -633,7 +633,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart const size_t initial_reservation = disk_reservation ? disk_reservation->getSize() : 0; Block block; - while (!isCancelled() && (block = merged_stream->read())) + while (!merges_blocker.isCancelled() && (block = merged_stream->read())) { rows_written += block.rows(); to.write(block); @@ -656,7 +656,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart merged_stream->readSuffix(); merged_stream.reset(); - if (isCancelled()) + if (merges_blocker.isCancelled()) throw Exception("Cancelled merging parts", ErrorCodes::ABORTED); MergeTreeData::DataPart::Checksums checksums_gathered_columns; @@ -727,7 +727,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart merge_entry->bytes_written_uncompressed += column_gathered_stream.getProfileInfo().bytes; merge_entry->progress = progress_before + column_sizes.columnProgress(column_name, sum_input_rows_exact, sum_input_rows_exact); - if (isCancelled()) + if (merges_blocker.isCancelled()) throw Exception("Cancelled merging parts", ErrorCodes::ABORTED); } @@ -1097,7 +1097,7 @@ size_t MergeTreeDataMerger::estimateDiskSpaceForMerge(const MergeTreeData::DataP void MergeTreeDataMerger::abortReshardPartitionIfRequested() { - if (isCancelled()) + if (merges_blocker.isCancelled()) throw Exception("Cancelled partition resharding", ErrorCodes::ABORTED); if (cancellation_hook) diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.h b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.h index 2a3e65120c8..834c56fa4e4 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.h @@ -4,6 +4,8 @@ #include #include #include +#include + namespace DB { @@ -110,42 +112,15 @@ private: */ MergeTreeData::DataPartsVector selectAllPartsFromPartition(const String & partition_id); - /** Temporarily cancel merges. - */ - class BlockerImpl - { - public: - BlockerImpl(MergeTreeDataMerger * merger_) : merger(merger_) - { - ++merger->cancelled; - } - - ~BlockerImpl() - { - --merger->cancelled; - } - private: - MergeTreeDataMerger * merger; - }; - public: - /** Cancel all merges. All currently running 'mergeParts' methods will throw exception soon. - * All new calls to 'mergeParts' will throw exception till all 'Blocker' objects will be destroyed. + /** Is used to cancel all merges. On cancel() call all currently running 'mergeParts' methods will throw exception soon. + * All new calls to 'mergeParts' will throw exception till all 'BlockHolder' objects will be destroyed. */ - using Blocker = std::unique_ptr; - Blocker cancel() { return std::make_unique(this); } - - /** Cancel all merges forever. - */ - void cancelForever() { ++cancelled; } - - bool isCancelled() const { return cancelled > 0; } - -public: + ActionBlocker merges_blocker; enum class MergeAlgorithm { - Horizontal, /// per-row merge of all columns + Horizontal, /// per-row merge of all columns Vertical /// per-row merge of PK columns, per-column gather for non-PK columns }; @@ -166,8 +141,6 @@ private: CancellationHook cancellation_hook; - std::atomic cancelled {0}; - void abortReshardPartitionIfRequested(); }; diff --git a/dbms/src/Storages/MergeTree/RemoteDiskSpaceMonitor.cpp b/dbms/src/Storages/MergeTree/RemoteDiskSpaceMonitor.cpp index 662e583dfe6..c6c0361ab70 100644 --- a/dbms/src/Storages/MergeTree/RemoteDiskSpaceMonitor.cpp +++ b/dbms/src/Storages/MergeTree/RemoteDiskSpaceMonitor.cpp @@ -39,7 +39,7 @@ std::string Service::getId(const std::string & node_id) const void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) { - if (is_cancelled) + if (blocker.isCancelled()) throw Exception{"RemoteDiskSpaceMonitor service terminated", ErrorCodes::ABORTED}; size_t free_space = DiskSpaceMonitor::getUnreservedFreeSpace(context.getPath()); diff --git a/dbms/src/Storages/MergeTree/RemoteQueryExecutor.cpp b/dbms/src/Storages/MergeTree/RemoteQueryExecutor.cpp index 80296e24cf4..e5c8d53ffb2 100644 --- a/dbms/src/Storages/MergeTree/RemoteQueryExecutor.cpp +++ b/dbms/src/Storages/MergeTree/RemoteQueryExecutor.cpp @@ -39,7 +39,7 @@ std::string Service::getId(const std::string & node_id) const void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) { - if (is_cancelled) + if (blocker.isCancelled()) throw Exception{"RemoteQueryExecutor service terminated", ErrorCodes::ABORTED}; std::string query = params.get("query"); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp index 8de62e18852..c1a666ec84d 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp @@ -67,10 +67,10 @@ void ReplicatedMergeTreeAlterThread::run() { /// If you need to lock table structure, then suspend merges. - MergeTreeDataMerger::Blocker merge_blocker; + ActionBlocker::BlockHolder merge_blocker; if (changed_version || force_recheck_parts) - merge_blocker = storage.merger.cancel(); + merge_blocker = storage.merger.merges_blocker.cancel(); MergeTreeData::DataParts parts; @@ -80,6 +80,14 @@ void ReplicatedMergeTreeAlterThread::run() /// Temporarily cancel part checks to avoid locking for long time. auto temporarily_stop_part_checks = storage.part_check_thread.temporarilyStop(); + /// Temporarily cancel parts sending + ActionBlocker::BlockHolder data_parts_exchange_blocker; + if (storage.data_parts_exchange_endpoint_holder) + data_parts_exchange_blocker = storage.data_parts_exchange_endpoint_holder->cancel(); + + /// Temporarily cancel part fetches + auto fetches_blocker = storage.fetcher.blocker.cancel(); + LOG_INFO(log, "Changed version of 'columns' node in ZooKeeper. Waiting for structure write lock."); auto table_lock = storage.lockStructureForAlter(__PRETTY_FUNCTION__); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index db8ddbd0e5f..84e50db0416 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -594,7 +594,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( sum_parts_size_in_bytes += part->size_in_bytes; } - if (merger.isCancelled()) + if (merger.merges_blocker.isCancelled()) { String reason = "Not executing log entry for part " + entry.new_part_name + " because merges are cancelled now."; LOG_DEBUG(log, reason); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index 3e6b68060b1..16096e5e762 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -160,22 +160,22 @@ void ReplicatedMergeTreeRestartingThread::run() try { - storage.endpoint_holder->cancel(); - storage.endpoint_holder = nullptr; + storage.data_parts_exchange_endpoint_holder->cancelForever(); + storage.data_parts_exchange_endpoint_holder = nullptr; - storage.disk_space_monitor_endpoint_holder->cancel(); + storage.disk_space_monitor_endpoint_holder->cancelForever(); storage.disk_space_monitor_endpoint_holder = nullptr; - storage.sharded_partition_uploader_endpoint_holder->cancel(); + storage.sharded_partition_uploader_endpoint_holder->cancelForever(); storage.sharded_partition_uploader_endpoint_holder = nullptr; - storage.remote_query_executor_endpoint_holder->cancel(); + storage.remote_query_executor_endpoint_holder->cancelForever(); storage.remote_query_executor_endpoint_holder = nullptr; - storage.remote_part_checker_endpoint_holder->cancel(); + storage.remote_part_checker_endpoint_holder->cancelForever(); storage.remote_part_checker_endpoint_holder = nullptr; - storage.merger.cancelForever(); + storage.merger.merges_blocker.cancelForever(); partialShutdown(); diff --git a/dbms/src/Storages/MergeTree/ShardedPartitionUploader.cpp b/dbms/src/Storages/MergeTree/ShardedPartitionUploader.cpp index 82385a45c71..60ba0945c7a 100644 --- a/dbms/src/Storages/MergeTree/ShardedPartitionUploader.cpp +++ b/dbms/src/Storages/MergeTree/ShardedPartitionUploader.cpp @@ -82,9 +82,9 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body WriteBufferFromFile file_out{absolute_part_path + file_name}; HashingWriteBuffer hashing_out{file_out}; - copyData(body, hashing_out, file_size, is_cancelled); + copyData(body, hashing_out, file_size, blocker.counter); - if (is_cancelled) + if (blocker.isCancelled()) { part_file.remove(true); throw Exception{"Fetching of part was cancelled", ErrorCodes::ABORTED}; @@ -96,8 +96,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body if (expected_hash != hashing_out.getHash()) throw Exception{"Checksum mismatch for file " + absolute_part_path + file_name + " transferred from " + replica_path}; - if (file_name != "checksums.txt" && - file_name != "columns.txt") + if (file_name != "checksums.txt" && file_name != "columns.txt") checksums.addFile(file_name, file_size, expected_hash); } diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index ba69c2b3b50..88e3ee10051 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -91,7 +91,7 @@ void StorageMergeTree::shutdown() if (shutdown_called) return; shutdown_called = true; - merger.cancelForever(); + merger.merges_blocker.cancelForever(); if (merge_task_handle) background_pool.removeTask(merge_task_handle); } @@ -151,7 +151,7 @@ void StorageMergeTree::alter( const Context & context) { /// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time. - auto merge_blocker = merger.cancel(); + auto merge_blocker = merger.merges_blocker.cancel(); auto table_soft_lock = lockDataForAlter(__PRETTY_FUNCTION__); @@ -402,7 +402,7 @@ void StorageMergeTree::clearColumnInPartition(const ASTPtr & partition, const Fi { /// Asks to complete merges and does not allow them to start. /// This protects against "revival" of data for a removed partition after completion of merge. - auto merge_blocker = merger.cancel(); + auto merge_blocker = merger.merges_blocker.cancel(); /// We don't change table structure, only data in some parts, parts are locked inside alterDataPart() function auto lock_read_structure = lockStructure(false, __PRETTY_FUNCTION__); @@ -462,7 +462,7 @@ void StorageMergeTree::dropPartition(const ASTPtr & query, const ASTPtr & partit { /// Asks to complete merges and does not allow them to start. /// This protects against "revival" of data for a removed partition after completion of merge. - auto merge_blocker = merger.cancel(); + auto merge_blocker = merger.merges_blocker.cancel(); /// Waits for completion of merge and does not start new ones. auto lock = lockForAlter(__PRETTY_FUNCTION__); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 583f07db82e..355d5fe5e90 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -335,7 +335,7 @@ StoragePtr StorageReplicatedMergeTree::create( { { InterserverIOEndpointPtr endpoint = std::make_shared(res->data, res_ptr); - res->endpoint_holder = get_endpoint_holder(endpoint); + res->data_parts_exchange_endpoint_holder = get_endpoint_holder(endpoint); } /// Services for resharding. @@ -2275,7 +2275,7 @@ void StorageReplicatedMergeTree::shutdown() * Because restarting_thread will wait for finishing of tasks in background pool, * and parts are fetched in that tasks. */ - fetcher.cancel(); + fetcher.blocker.cancelForever(); if (restarting_thread) { @@ -2283,36 +2283,36 @@ void StorageReplicatedMergeTree::shutdown() restarting_thread.reset(); } - if (endpoint_holder) + if (data_parts_exchange_endpoint_holder) { - endpoint_holder->cancel(); - endpoint_holder = nullptr; + data_parts_exchange_endpoint_holder->cancelForever(); + data_parts_exchange_endpoint_holder = nullptr; } if (disk_space_monitor_endpoint_holder) { - disk_space_monitor_endpoint_holder->cancel(); + disk_space_monitor_endpoint_holder->cancelForever(); disk_space_monitor_endpoint_holder = nullptr; } disk_space_monitor_client.cancel(); if (sharded_partition_uploader_endpoint_holder) { - sharded_partition_uploader_endpoint_holder->cancel(); + sharded_partition_uploader_endpoint_holder->cancelForever(); sharded_partition_uploader_endpoint_holder = nullptr; } sharded_partition_uploader_client.cancel(); if (remote_query_executor_endpoint_holder) { - remote_query_executor_endpoint_holder->cancel(); + remote_query_executor_endpoint_holder->cancelForever(); remote_query_executor_endpoint_holder = nullptr; } remote_query_executor_client.cancel(); if (remote_part_checker_endpoint_holder) { - remote_part_checker_endpoint_holder->cancel(); + remote_part_checker_endpoint_holder->cancelForever(); remote_part_checker_endpoint_holder = nullptr; } } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index 1099e80693a..33b8658a8d6 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -258,7 +258,7 @@ private: bool is_leader_node = false; std::mutex leader_node_mutex; - InterserverIOEndpointHolderPtr endpoint_holder; + InterserverIOEndpointHolderPtr data_parts_exchange_endpoint_holder; InterserverIOEndpointHolderPtr disk_space_monitor_endpoint_holder; InterserverIOEndpointHolderPtr sharded_partition_uploader_endpoint_holder; InterserverIOEndpointHolderPtr remote_query_executor_endpoint_holder;