get rid of StorageWeakPtr

This commit is contained in:
Alexander Tokmakov 2020-01-14 17:27:48 +03:00
parent e63ef08af8
commit ff1b7e1386
8 changed files with 26 additions and 101 deletions

View File

@ -61,6 +61,10 @@ void InterserverIOHTTPHandler::processQuery(Poco::Net::HTTPServerRequest & reque
ReadBufferFromIStream body(request.stream());
auto endpoint = server.context().getInterserverIOHandler().getEndpoint(endpoint_name);
/// Locked for read while query processing
std::shared_lock lock(endpoint->rwlock);
if (endpoint->blocker.isCancelled())
throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);
if (compress)
{

View File

@ -11,6 +11,7 @@
#include <map>
#include <atomic>
#include <utility>
#include <shared_mutex>
#include <Poco/Net/HTMLForm.h>
namespace Poco { namespace Net { class HTTPServerResponse; } }
@ -24,42 +25,6 @@ namespace ErrorCodes
extern const int NO_SUCH_INTERSERVER_IO_ENDPOINT;
}
/** Location of the service.
*/
struct InterserverIOEndpointLocation
{
public:
InterserverIOEndpointLocation(const std::string & name_, const std::string & host_, UInt16 port_)
: name(name_), host(host_), port(port_)
{
}
/// Creates a location based on its serialized representation.
InterserverIOEndpointLocation(const std::string & serialized_location)
{
ReadBufferFromString buf(serialized_location);
readBinary(name, buf);
readBinary(host, buf);
readBinary(port, buf);
assertEOF(buf);
}
/// Serializes the location.
std::string toString() const
{
WriteBufferFromOwnString buf;
writeBinary(name, buf);
writeBinary(host, buf);
writeBinary(port, buf);
return buf.str();
}
public:
std::string name;
std::string host;
UInt16 port;
};
/** Query processor from other servers.
*/
class InterserverIOEndpoint
@ -71,6 +36,7 @@ public:
/// You need to stop the data transfer if blocker is activated.
ActionBlocker blocker;
std::shared_mutex rwlock;
};
using InterserverIOEndpointPtr = std::shared_ptr<InterserverIOEndpoint>;
@ -90,11 +56,10 @@ public:
throw Exception("Duplicate interserver IO endpoint: " + name, ErrorCodes::DUPLICATE_INTERSERVER_IO_ENDPOINT);
}
void removeEndpoint(const String & name)
bool removeEndpointIfExists(const String & name)
{
std::lock_guard lock(mutex);
if (!endpoint_map.erase(name))
throw Exception("No interserver IO endpoint named " + name, ErrorCodes::NO_SUCH_INTERSERVER_IO_ENDPOINT);
return endpoint_map.erase(name);
}
InterserverIOEndpointPtr getEndpoint(const String & name)
@ -115,41 +80,4 @@ private:
std::mutex mutex;
};
/// In the constructor calls `addEndpoint`, in the destructor - `removeEndpoint`.
class InterserverIOEndpointHolder
{
public:
InterserverIOEndpointHolder(const String & name_, InterserverIOEndpointPtr endpoint_, InterserverIOHandler & handler_)
: name(name_), endpoint(std::move(endpoint_)), handler(handler_)
{
handler.addEndpoint(name, endpoint);
}
InterserverIOEndpointPtr getEndpoint()
{
return endpoint;
}
~InterserverIOEndpointHolder()
try
{
handler.removeEndpoint(name);
/// After destroying the object, `endpoint` can still live, since its ownership is acquired during the processing of the request,
/// see InterserverIOHTTPHandler.cpp
}
catch (...)
{
tryLogCurrentException("~InterserverIOEndpointHolder");
}
ActionBlocker & getBlocker() { return endpoint->blocker; }
private:
String name;
InterserverIOEndpointPtr endpoint;
InterserverIOHandler & handler;
};
using InterserverIOEndpointHolderPtr = std::shared_ptr<InterserverIOEndpointHolder>;
}

View File

@ -11,7 +11,6 @@ namespace DB
class IStorage;
using StoragePtr = std::shared_ptr<IStorage>;
using StorageWeakPtr = std::weak_ptr<IStorage>;
using Tables = std::map<String, StoragePtr>;
}

View File

@ -1,8 +1,6 @@
#include <Storages/MergeTree/DataPartsExchange.h>
#include <Storages/IStorage.h>
#include <Common/CurrentMetrics.h>
#include <Common/NetException.h>
#include <Common/typeid_cast.h>
#include <IO/HTTPCommon.h>
#include <Poco/File.h>
#include <ext/scope_guard.h>
@ -53,9 +51,6 @@ std::string Service::getId(const std::string & node_id) const
void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*body*/, WriteBuffer & out, Poco::Net::HTTPServerResponse & response)
{
if (blocker.isCancelled())
throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);
String client_protocol_version = params.get("client_protocol_version", REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE);
@ -88,15 +83,11 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
++data.current_table_sends;
SCOPE_EXIT({--data.current_table_sends;});
StoragePtr owned_storage = storage.lock();
if (!owned_storage)
throw Exception("The table was already dropped", ErrorCodes::UNKNOWN_TABLE);
LOG_TRACE(log, "Sending part " << part_name);
try
{
auto storage_lock = owned_storage->lockStructureForShare(false, RWLockImpl::NO_QUERY);
auto storage_lock = data.lockStructureForShare(false, RWLockImpl::NO_QUERY);
MergeTreeData::DataPartPtr part = findPart(part_name);

View File

@ -20,8 +20,8 @@ namespace DataPartsExchange
class Service final : public InterserverIOEndpoint
{
public:
Service(MergeTreeData & data_, StoragePtr & storage_) : data(data_),
storage(storage_), log(&Logger::get(data.getLogName() + " (Replicated PartsService)")) {}
Service(MergeTreeData & data_)
: data(data_), log(&Logger::get(data.getLogName() + " (Replicated PartsService)")) {}
Service(const Service &) = delete;
Service & operator=(const Service &) = delete;
@ -33,8 +33,9 @@ private:
MergeTreeData::DataPartPtr findPart(const String & name);
private:
/// StorageReplicatedMergeTree::shutdown() waits for all parts exchange handlers to finish,
/// so Service will never access dangling reference to storage
MergeTreeData & data;
StorageWeakPtr storage;
Logger * log;
};

View File

@ -99,8 +99,8 @@ void ReplicatedMergeTreeAlterThread::run()
/// Temporarily cancel parts sending
ActionLock data_parts_exchange_blocker;
if (storage.data_parts_exchange_endpoint_holder)
data_parts_exchange_blocker = storage.data_parts_exchange_endpoint_holder->getBlocker().cancel();
if (storage.data_parts_exchange_endpoint)
data_parts_exchange_blocker = storage.data_parts_exchange_endpoint->blocker.cancel();
/// Temporarily cancel part fetches
auto fetches_blocker = storage.fetcher.blocker.cancel();

View File

@ -2917,10 +2917,8 @@ void StorageReplicatedMergeTree::startup()
database_name + "." + table_name + " (ReplicatedMergeTreeQueue)",
getDataParts());
StoragePtr ptr = shared_from_this();
InterserverIOEndpointPtr data_parts_exchange_endpoint = std::make_shared<DataPartsExchange::Service>(*this, ptr);
data_parts_exchange_endpoint_holder = std::make_shared<InterserverIOEndpointHolder>(
data_parts_exchange_endpoint->getId(replica_path), data_parts_exchange_endpoint, global_context.getInterserverIOHandler());
data_parts_exchange_endpoint = std::make_shared<DataPartsExchange::Service>(*this);
global_context.getInterserverIOHandler().addEndpoint(data_parts_exchange_endpoint->getId(replica_path), data_parts_exchange_endpoint);
queue_task_handle = global_context.getBackgroundPool().addTask([this] { return queueTask(); });
if (areBackgroundMovesNeeded())
@ -2952,11 +2950,15 @@ void StorageReplicatedMergeTree::shutdown()
global_context.getBackgroundMovePool().removeTask(move_parts_task_handle);
move_parts_task_handle.reset();
if (data_parts_exchange_endpoint_holder)
if (data_parts_exchange_endpoint)
{
data_parts_exchange_endpoint_holder->getBlocker().cancelForever();
data_parts_exchange_endpoint_holder = nullptr;
global_context.getInterserverIOHandler().removeEndpointIfExists(data_parts_exchange_endpoint->getId(replica_path));
/// Ask all parts exchange handlers to finish asap. New ones will fail to start
data_parts_exchange_endpoint->blocker.cancelForever();
/// Wait for all of them
std::unique_lock lock(data_parts_exchange_endpoint->rwlock);
}
data_parts_exchange_endpoint.reset();
}
@ -5206,7 +5208,7 @@ ActionLock StorageReplicatedMergeTree::getActionLock(StorageActionBlockType acti
return fetcher.blocker.cancel();
if (action_type == ActionLocks::PartsSend)
return data_parts_exchange_endpoint_holder ? data_parts_exchange_endpoint_holder->getBlocker().cancel() : ActionLock();
return data_parts_exchange_endpoint ? data_parts_exchange_endpoint->blocker.cancel() : ActionLock();
if (action_type == ActionLocks::ReplicationQueue)
return queue.actions_blocker.cancel();

View File

@ -233,7 +233,7 @@ private:
std::atomic<bool> is_leader {false};
zkutil::LeaderElectionPtr leader_election;
InterserverIOEndpointHolderPtr data_parts_exchange_endpoint_holder;
InterserverIOEndpointPtr data_parts_exchange_endpoint;
MergeTreeDataSelectExecutor reader;
MergeTreeDataWriter writer;