diff --git a/dbms/programs/server/InterserverIOHTTPHandler.cpp b/dbms/programs/server/InterserverIOHTTPHandler.cpp index 5302302bb5b..407d3c41a9b 100644 --- a/dbms/programs/server/InterserverIOHTTPHandler.cpp +++ b/dbms/programs/server/InterserverIOHTTPHandler.cpp @@ -61,6 +61,10 @@ void InterserverIOHTTPHandler::processQuery(Poco::Net::HTTPServerRequest & reque ReadBufferFromIStream body(request.stream()); auto endpoint = server.context().getInterserverIOHandler().getEndpoint(endpoint_name); + /// Locked for read while query processing + std::shared_lock lock(endpoint->rwlock); + if (endpoint->blocker.isCancelled()) + throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED); if (compress) { diff --git a/dbms/src/Interpreters/InterserverIOHandler.h b/dbms/src/Interpreters/InterserverIOHandler.h index 7cef5df9866..4651c8cb978 100644 --- a/dbms/src/Interpreters/InterserverIOHandler.h +++ b/dbms/src/Interpreters/InterserverIOHandler.h @@ -11,6 +11,7 @@ #include #include #include +#include #include namespace Poco { namespace Net { class HTTPServerResponse; } } @@ -24,42 +25,6 @@ namespace ErrorCodes extern const int NO_SUCH_INTERSERVER_IO_ENDPOINT; } -/** Location of the service. - */ -struct InterserverIOEndpointLocation -{ -public: - InterserverIOEndpointLocation(const std::string & name_, const std::string & host_, UInt16 port_) - : name(name_), host(host_), port(port_) - { - } - - /// Creates a location based on its serialized representation. - InterserverIOEndpointLocation(const std::string & serialized_location) - { - ReadBufferFromString buf(serialized_location); - readBinary(name, buf); - readBinary(host, buf); - readBinary(port, buf); - assertEOF(buf); - } - - /// Serializes the location. - std::string toString() const - { - WriteBufferFromOwnString buf; - writeBinary(name, buf); - writeBinary(host, buf); - writeBinary(port, buf); - return buf.str(); - } - -public: - std::string name; - std::string host; - UInt16 port; -}; - /** Query processor from other servers. */ class InterserverIOEndpoint @@ -71,6 +36,7 @@ public: /// You need to stop the data transfer if blocker is activated. ActionBlocker blocker; + std::shared_mutex rwlock; }; using InterserverIOEndpointPtr = std::shared_ptr; @@ -90,11 +56,10 @@ public: throw Exception("Duplicate interserver IO endpoint: " + name, ErrorCodes::DUPLICATE_INTERSERVER_IO_ENDPOINT); } - void removeEndpoint(const String & name) + bool removeEndpointIfExists(const String & name) { std::lock_guard lock(mutex); - if (!endpoint_map.erase(name)) - throw Exception("No interserver IO endpoint named " + name, ErrorCodes::NO_SUCH_INTERSERVER_IO_ENDPOINT); + return endpoint_map.erase(name); } InterserverIOEndpointPtr getEndpoint(const String & name) @@ -115,41 +80,4 @@ private: std::mutex mutex; }; -/// In the constructor calls `addEndpoint`, in the destructor - `removeEndpoint`. -class InterserverIOEndpointHolder -{ -public: - InterserverIOEndpointHolder(const String & name_, InterserverIOEndpointPtr endpoint_, InterserverIOHandler & handler_) - : name(name_), endpoint(std::move(endpoint_)), handler(handler_) - { - handler.addEndpoint(name, endpoint); - } - - InterserverIOEndpointPtr getEndpoint() - { - return endpoint; - } - - ~InterserverIOEndpointHolder() - try - { - handler.removeEndpoint(name); - /// After destroying the object, `endpoint` can still live, since its ownership is acquired during the processing of the request, - /// see InterserverIOHTTPHandler.cpp - } - catch (...) - { - tryLogCurrentException("~InterserverIOEndpointHolder"); - } - - ActionBlocker & getBlocker() { return endpoint->blocker; } - -private: - String name; - InterserverIOEndpointPtr endpoint; - InterserverIOHandler & handler; -}; - -using InterserverIOEndpointHolderPtr = std::shared_ptr; - } diff --git a/dbms/src/Storages/IStorage_fwd.h b/dbms/src/Storages/IStorage_fwd.h index e80fa2a0eb6..4983a734d21 100644 --- a/dbms/src/Storages/IStorage_fwd.h +++ b/dbms/src/Storages/IStorage_fwd.h @@ -11,7 +11,6 @@ namespace DB class IStorage; using StoragePtr = std::shared_ptr; -using StorageWeakPtr = std::weak_ptr; using Tables = std::map; } diff --git a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp index 12137b4f023..e459de6fa58 100644 --- a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp @@ -1,8 +1,6 @@ #include -#include #include #include -#include #include #include #include @@ -53,9 +51,6 @@ std::string Service::getId(const std::string & node_id) const void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*body*/, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) { - if (blocker.isCancelled()) - throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED); - String client_protocol_version = params.get("client_protocol_version", REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE); @@ -88,15 +83,11 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo ++data.current_table_sends; SCOPE_EXIT({--data.current_table_sends;}); - StoragePtr owned_storage = storage.lock(); - if (!owned_storage) - throw Exception("The table was already dropped", ErrorCodes::UNKNOWN_TABLE); - LOG_TRACE(log, "Sending part " << part_name); try { - auto storage_lock = owned_storage->lockStructureForShare(false, RWLockImpl::NO_QUERY); + auto storage_lock = data.lockStructureForShare(false, RWLockImpl::NO_QUERY); MergeTreeData::DataPartPtr part = findPart(part_name); diff --git a/dbms/src/Storages/MergeTree/DataPartsExchange.h b/dbms/src/Storages/MergeTree/DataPartsExchange.h index 00d46870866..c0e8c0d2331 100644 --- a/dbms/src/Storages/MergeTree/DataPartsExchange.h +++ b/dbms/src/Storages/MergeTree/DataPartsExchange.h @@ -20,8 +20,8 @@ namespace DataPartsExchange class Service final : public InterserverIOEndpoint { public: - Service(MergeTreeData & data_, StoragePtr & storage_) : data(data_), - storage(storage_), log(&Logger::get(data.getLogName() + " (Replicated PartsService)")) {} + Service(MergeTreeData & data_) + : data(data_), log(&Logger::get(data.getLogName() + " (Replicated PartsService)")) {} Service(const Service &) = delete; Service & operator=(const Service &) = delete; @@ -33,8 +33,9 @@ private: MergeTreeData::DataPartPtr findPart(const String & name); private: + /// StorageReplicatedMergeTree::shutdown() waits for all parts exchange handlers to finish, + /// so Service will never access dangling reference to storage MergeTreeData & data; - StorageWeakPtr storage; Logger * log; }; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp index c639275158f..31541cc431b 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp @@ -99,8 +99,8 @@ void ReplicatedMergeTreeAlterThread::run() /// Temporarily cancel parts sending ActionLock data_parts_exchange_blocker; - if (storage.data_parts_exchange_endpoint_holder) - data_parts_exchange_blocker = storage.data_parts_exchange_endpoint_holder->getBlocker().cancel(); + if (storage.data_parts_exchange_endpoint) + data_parts_exchange_blocker = storage.data_parts_exchange_endpoint->blocker.cancel(); /// Temporarily cancel part fetches auto fetches_blocker = storage.fetcher.blocker.cancel(); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index c65b05ef67b..8d784c07cfa 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -2917,10 +2917,8 @@ void StorageReplicatedMergeTree::startup() database_name + "." + table_name + " (ReplicatedMergeTreeQueue)", getDataParts()); - StoragePtr ptr = shared_from_this(); - InterserverIOEndpointPtr data_parts_exchange_endpoint = std::make_shared(*this, ptr); - data_parts_exchange_endpoint_holder = std::make_shared( - data_parts_exchange_endpoint->getId(replica_path), data_parts_exchange_endpoint, global_context.getInterserverIOHandler()); + data_parts_exchange_endpoint = std::make_shared(*this); + global_context.getInterserverIOHandler().addEndpoint(data_parts_exchange_endpoint->getId(replica_path), data_parts_exchange_endpoint); queue_task_handle = global_context.getBackgroundPool().addTask([this] { return queueTask(); }); if (areBackgroundMovesNeeded()) @@ -2952,11 +2950,15 @@ void StorageReplicatedMergeTree::shutdown() global_context.getBackgroundMovePool().removeTask(move_parts_task_handle); move_parts_task_handle.reset(); - if (data_parts_exchange_endpoint_holder) + if (data_parts_exchange_endpoint) { - data_parts_exchange_endpoint_holder->getBlocker().cancelForever(); - data_parts_exchange_endpoint_holder = nullptr; + global_context.getInterserverIOHandler().removeEndpointIfExists(data_parts_exchange_endpoint->getId(replica_path)); + /// Ask all parts exchange handlers to finish asap. New ones will fail to start + data_parts_exchange_endpoint->blocker.cancelForever(); + /// Wait for all of them + std::unique_lock lock(data_parts_exchange_endpoint->rwlock); } + data_parts_exchange_endpoint.reset(); } @@ -5206,7 +5208,7 @@ ActionLock StorageReplicatedMergeTree::getActionLock(StorageActionBlockType acti return fetcher.blocker.cancel(); if (action_type == ActionLocks::PartsSend) - return data_parts_exchange_endpoint_holder ? data_parts_exchange_endpoint_holder->getBlocker().cancel() : ActionLock(); + return data_parts_exchange_endpoint ? data_parts_exchange_endpoint->blocker.cancel() : ActionLock(); if (action_type == ActionLocks::ReplicationQueue) return queue.actions_blocker.cancel(); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index 60c2ea0b870..d988e86b83b 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -233,7 +233,7 @@ private: std::atomic is_leader {false}; zkutil::LeaderElectionPtr leader_election; - InterserverIOEndpointHolderPtr data_parts_exchange_endpoint_holder; + InterserverIOEndpointPtr data_parts_exchange_endpoint; MergeTreeDataSelectExecutor reader; MergeTreeDataWriter writer;