Asynchronously drain connections.

This commit is contained in:
Amos Bird 2021-07-14 21:17:30 +08:00
parent 61361cad9c
commit dbfb699690
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
27 changed files with 414 additions and 102 deletions

View File

@ -1702,14 +1702,15 @@ void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskT
LOG_INFO(log, "All helping tables dropped partition {}", partition_name);
}
String ClusterCopier::getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings & settings)
String ClusterCopier::getRemoteCreateTable(
const DatabaseAndTableName & table, Connection & connection, const Settings & settings)
{
auto remote_context = Context::createCopy(context);
remote_context->setSettings(settings);
String query = "SHOW CREATE TABLE " + getQuotedTable(table);
Block block = getBlockWithAllStreamData(std::make_shared<RemoteBlockInputStream>(
connection, query, InterpreterShowCreateQuery::getSampleBlock(), remote_context));
Block block = getBlockWithAllStreamData(
std::make_shared<RemoteBlockInputStream>(connection, query, InterpreterShowCreateQuery::getSampleBlock(), remote_context));
return typeid_cast<const ColumnString &>(*block.safeGetByPosition(0).column).getDataAt(0).toString();
}
@ -1719,10 +1720,8 @@ ASTPtr ClusterCopier::getCreateTableForPullShard(const ConnectionTimeouts & time
{
/// Fetch and parse (possibly) new definition
auto connection_entry = task_shard.info.pool->get(timeouts, &task_cluster->settings_pull, true);
String create_query_pull_str = getRemoteCreateTable(
task_shard.task_table.table_pull,
*connection_entry,
task_cluster->settings_pull);
String create_query_pull_str
= getRemoteCreateTable(task_shard.task_table.table_pull, *connection_entry, task_cluster->settings_pull);
ParserCreateQuery parser_create_query;
const auto & settings = getContext()->getSettingsRef();

View File

@ -59,6 +59,7 @@
#include <TableFunctions/registerTableFunctions.h>
#include <Formats/registerFormats.h>
#include <Storages/registerStorages.h>
#include <DataStreams/ConnectionCollector.h>
#include <Dictionaries/registerDictionaries.h>
#include <Disks/registerDisks.h>
#include <Common/Config/ConfigReloader.h>
@ -503,6 +504,8 @@ if (ThreadFuzzer::instance().isEffective())
// ignore `max_thread_pool_size` in configs we fetch from ZK, but oh well.
GlobalThreadPool::initialize(config().getUInt("max_thread_pool_size", 10000));
ConnectionCollector::init(global_context, config().getUInt("max_threads_for_connection_collector", 10));
bool has_zookeeper = config().has("zookeeper");
zkutil::ZooKeeperNodeCache main_config_zk_node_cache([&] { return global_context->getZooKeeper(); });

View File

@ -28,6 +28,8 @@ HedgedConnections::HedgedConnections(
std::shared_ptr<QualifiedTableName> table_to_check_)
: hedged_connections_factory(pool_, &settings_, timeouts_, table_to_check_)
, settings(settings_)
, drain_timeout(settings.drain_timeout)
, allow_changing_replica_until_first_data_packet(settings.allow_changing_replica_until_first_data_packet)
, throttler(throttler_)
{
std::vector<Connection *> connections = hedged_connections_factory.getManyConnections(pool_mode);
@ -251,7 +253,7 @@ Packet HedgedConnections::drain()
while (!epoll.empty())
{
ReplicaLocation location = getReadyReplicaLocation();
ReplicaLocation location = getReadyReplicaLocation(DrainCallback{drain_timeout});
Packet packet = receivePacketFromReplica(location);
switch (packet.type)
{
@ -278,10 +280,10 @@ Packet HedgedConnections::drain()
Packet HedgedConnections::receivePacket()
{
std::lock_guard lock(cancel_mutex);
return receivePacketUnlocked({});
return receivePacketUnlocked({}, false /* is_draining */);
}
Packet HedgedConnections::receivePacketUnlocked(AsyncCallback async_callback)
Packet HedgedConnections::receivePacketUnlocked(AsyncCallback async_callback, bool /* is_draining */)
{
if (!sent_query)
throw Exception("Cannot receive packets: no query sent.", ErrorCodes::LOGICAL_ERROR);
@ -396,7 +398,7 @@ Packet HedgedConnections::receivePacketFromReplica(const ReplicaLocation & repli
{
/// If we are allowed to change replica until the first data packet,
/// just restart timeout (if it hasn't expired yet). Otherwise disable changing replica with this offset.
if (settings.allow_changing_replica_until_first_data_packet && !replica.is_change_replica_timeout_expired)
if (allow_changing_replica_until_first_data_packet && !replica.is_change_replica_timeout_expired)
replica.change_replica_timeout.setRelative(hedged_connections_factory.getConnectionTimeouts().receive_data_timeout);
else
disableChangingReplica(replica_location);

View File

@ -97,7 +97,7 @@ public:
Packet receivePacket() override;
Packet receivePacketUnlocked(AsyncCallback async_callback) override;
Packet receivePacketUnlocked(AsyncCallback async_callback, bool is_draining) override;
void disconnect() override;
@ -189,6 +189,12 @@ private:
Epoll epoll;
const Settings & settings;
/// The following two fields are from settings but can be referenced outside the lifetime of
/// settings when connection is drained asynchronously.
Poco::Timespan drain_timeout;
bool allow_changing_replica_until_first_data_packet;
ThrottlerPtr throttler;
bool sent_query = false;
bool cancelled = false;

View File

@ -0,0 +1,31 @@
#include <Client/IConnections.h>
#include <Poco/Net/SocketImpl.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SOCKET_TIMEOUT;
}
/// This wrapper struct allows us to use Poco's socket polling code with a raw fd.
/// The only difference from Poco::Net::SocketImpl is that we don't close the fd in the destructor.
struct PocoSocketWrapper : public Poco::Net::SocketImpl
{
explicit PocoSocketWrapper(int fd)
{
reset(fd);
}
// Do not close fd.
~PocoSocketWrapper() override = default;
};
void IConnections::DrainCallback::operator()(int fd, Poco::Timespan, const std::string fd_description) const
{
if (!PocoSocketWrapper(fd).poll(drain_timeout, Poco::Net::Socket::SELECT_READ))
throw Exception(ErrorCodes::SOCKET_TIMEOUT, "Read timeout while draining from {}", fd_description);
}
}

View File

@ -10,6 +10,12 @@ namespace DB
class IConnections : boost::noncopyable
{
public:
struct DrainCallback
{
Poco::Timespan drain_timeout;
void operator()(int fd, Poco::Timespan, const std::string fd_description = "") const;
};
/// Send all scalars to replicas.
virtual void sendScalarsData(Scalars & data) = 0;
/// Send all content of external tables to replicas.
@ -30,7 +36,7 @@ public:
virtual Packet receivePacket() = 0;
/// Version of `receivePacket` function without locking.
virtual Packet receivePacketUnlocked(AsyncCallback async_callback) = 0;
virtual Packet receivePacketUnlocked(AsyncCallback async_callback, bool is_draining) = 0;
/// Break all active connections.
virtual void disconnect() = 0;

View File

@ -18,7 +18,7 @@ namespace ErrorCodes
MultiplexedConnections::MultiplexedConnections(Connection & connection, const Settings & settings_, const ThrottlerPtr & throttler)
: settings(settings_)
: settings(settings_), drain_timeout(settings.drain_timeout), receive_timeout(settings.receive_timeout)
{
connection.setThrottler(throttler);
@ -30,9 +30,8 @@ MultiplexedConnections::MultiplexedConnections(Connection & connection, const Se
}
MultiplexedConnections::MultiplexedConnections(
std::vector<IConnectionPool::Entry> && connections,
const Settings & settings_, const ThrottlerPtr & throttler)
: settings(settings_)
std::vector<IConnectionPool::Entry> && connections, const Settings & settings_, const ThrottlerPtr & throttler)
: settings(settings_), drain_timeout(settings.drain_timeout), receive_timeout(settings.receive_timeout)
{
/// If we didn't get any connections from pool and getMany() did not throw exceptions, this means that
/// `skip_unavailable_shards` was set. Then just return.
@ -168,7 +167,7 @@ void MultiplexedConnections::sendReadTaskResponse(const String & response)
Packet MultiplexedConnections::receivePacket()
{
std::lock_guard lock(cancel_mutex);
Packet packet = receivePacketUnlocked({});
Packet packet = receivePacketUnlocked({}, false /* is_draining */);
return packet;
}
@ -216,7 +215,7 @@ Packet MultiplexedConnections::drain()
while (hasActiveConnections())
{
Packet packet = receivePacketUnlocked({});
Packet packet = receivePacketUnlocked(DrainCallback{drain_timeout}, true /* is_draining */);
switch (packet.type)
{
@ -264,14 +263,14 @@ std::string MultiplexedConnections::dumpAddressesUnlocked() const
return buf.str();
}
Packet MultiplexedConnections::receivePacketUnlocked(AsyncCallback async_callback)
Packet MultiplexedConnections::receivePacketUnlocked(AsyncCallback async_callback, bool is_draining)
{
if (!sent_query)
throw Exception("Cannot receive packets: no query sent.", ErrorCodes::LOGICAL_ERROR);
if (!hasActiveConnections())
throw Exception("No more packets are available.", ErrorCodes::LOGICAL_ERROR);
ReplicaState & state = getReplicaForReading();
ReplicaState & state = getReplicaForReading(is_draining);
current_connection = state.connection;
if (current_connection == nullptr)
throw Exception("Logical error: no available replica", ErrorCodes::NO_AVAILABLE_REPLICA);
@ -323,9 +322,10 @@ Packet MultiplexedConnections::receivePacketUnlocked(AsyncCallback async_callbac
return packet;
}
MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForReading()
MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForReading(bool is_draining)
{
if (replica_states.size() == 1)
/// Fast path when we only focus on one replica and are not draining the connection.
if (replica_states.size() == 1 && !is_draining)
return replica_states[0];
Poco::Net::Socket::SocketList read_list;
@ -353,10 +353,26 @@ MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForRead
read_list.push_back(*connection->socket);
}
int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings.receive_timeout);
int n = Poco::Net::Socket::select(
read_list,
write_list,
except_list,
is_draining ? drain_timeout : receive_timeout);
if (n == 0)
throw Exception("Timeout exceeded while reading from " + dumpAddressesUnlocked(), ErrorCodes::TIMEOUT_EXCEEDED);
{
auto err_msg = fmt::format("Timeout exceeded while reading from {}", dumpAddressesUnlocked());
for (ReplicaState & state : replica_states)
{
Connection * connection = state.connection;
if (connection != nullptr)
{
connection->disconnect();
invalidateReplica(state);
}
}
throw Exception(err_msg, ErrorCodes::TIMEOUT_EXCEEDED);
}
}
/// TODO Absolutely wrong code: read_list could be empty; motivation of rand is unclear.

View File

@ -61,7 +61,7 @@ public:
bool hasActiveConnections() const override { return active_connection_count > 0; }
private:
Packet receivePacketUnlocked(AsyncCallback async_callback) override;
Packet receivePacketUnlocked(AsyncCallback async_callback, bool is_draining) override;
/// Internal version of `dumpAddresses` function without locking.
std::string dumpAddressesUnlocked() const;
@ -74,7 +74,7 @@ private:
};
/// Get a replica where you can read the data.
ReplicaState & getReplicaForReading();
ReplicaState & getReplicaForReading(bool is_draining);
/// Mark the replica as invalid.
void invalidateReplica(ReplicaState & replica_state);
@ -82,6 +82,11 @@ private:
private:
const Settings & settings;
/// The following two fields are from settings but can be referenced outside the lifetime of
/// settings when connection is drained asynchronously.
Poco::Timespan drain_timeout;
Poco::Timespan receive_timeout;
/// The current number of valid connections to the replicas of this shard.
size_t active_connection_count = 0;

View File

@ -71,6 +71,10 @@
M(PartsInMemory, "In-memory parts.") \
M(MMappedFiles, "Total number of mmapped files.") \
M(MMappedFileBytes, "Sum size of mmapped file regions.") \
M(AsyncDrainedConnections, "Number of connections drained asynchronously.") \
M(ActiveAsyncDrainedConnections, "Number of active connections drained asynchronously.") \
M(SyncDrainedConnections, "Number of connections drained synchronously.") \
M(ActiveSyncDrainedConnections, "Number of active connections drained synchronously.") \
namespace CurrentMetrics
{

View File

@ -11,6 +11,7 @@
#define DBMS_DEFAULT_CONNECT_TIMEOUT_WITH_FAILOVER_SECURE_MS 100
#define DBMS_DEFAULT_SEND_TIMEOUT_SEC 300
#define DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC 300
#define DBMS_DEFAULT_DRAIN_TIMEOUT_SEC 3
/// Timeouts for hedged requests.
#define DBMS_DEFAULT_HEDGED_CONNECTION_TIMEOUT_MS 100
#define DBMS_DEFAULT_RECEIVE_DATA_TIMEOUT_MS 2000

View File

@ -54,6 +54,7 @@ class IColumn;
M(Milliseconds, connect_timeout_with_failover_secure_ms, DBMS_DEFAULT_CONNECT_TIMEOUT_WITH_FAILOVER_SECURE_MS, "Connection timeout for selecting first healthy replica (for secure connections).", 0) \
M(Seconds, receive_timeout, DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, "", 0) \
M(Seconds, send_timeout, DBMS_DEFAULT_SEND_TIMEOUT_SEC, "", 0) \
M(Seconds, drain_timeout, DBMS_DEFAULT_DRAIN_TIMEOUT_SEC, "", 0) \
M(Seconds, tcp_keep_alive_timeout, 0, "The time in seconds the connection needs to remain idle before TCP starts sending keepalive probes", 0) \
M(Milliseconds, hedged_connection_timeout_ms, DBMS_DEFAULT_HEDGED_CONNECTION_TIMEOUT_MS, "Connection timeout for establishing connection with replica for Hedged requests", 0) \
M(Milliseconds, receive_data_timeout_ms, DBMS_DEFAULT_RECEIVE_DATA_TIMEOUT_MS, "Connection timeout for receiving first packet of data or packet with positive progress from replica", 0) \
@ -233,6 +234,8 @@ class IColumn;
M(Milliseconds, sleep_in_send_tables_status_ms, 0, "Time to sleep in sending tables status response in TCPHandler", 0) \
M(Milliseconds, sleep_in_send_data_ms, 0, "Time to sleep in sending data in TCPHandler", 0) \
M(UInt64, unknown_packet_in_send_data, 0, "Send unknown packet instead of data Nth data packet", 0) \
/** Settings for testing connection collector */ \
M(Milliseconds, sleep_in_receive_cancel_ms, 0, "Time to sleep in receiving cancel in TCPHandler", 0) \
\
M(Bool, insert_allow_materialized_columns, 0, "If setting is enabled, Allow materialized columns in INSERT.", 0) \
M(Seconds, http_connection_timeout, DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, "HTTP connection timeout.", 0) \

View File

@ -0,0 +1,116 @@
#include <DataStreams/ConnectionCollector.h>
#include <Core/BackgroundSchedulePool.h>
#include <Interpreters/Context.h>
#include <Common/Exception.h>
#include <common/logger_useful.h>
namespace CurrentMetrics
{
extern const Metric AsyncDrainedConnections;
extern const Metric ActiveAsyncDrainedConnections;
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_PACKET_FROM_SERVER;
}
std::unique_ptr<ConnectionCollector> ConnectionCollector::connection_collector;
static constexpr UInt64 max_connection_draining_tasks_per_thread = 20;
ConnectionCollector::ConnectionCollector(ContextMutablePtr global_context_, size_t max_threads)
: WithMutableContext(global_context_), pool(max_threads, max_threads, max_threads * max_connection_draining_tasks_per_thread)
{
}
ConnectionCollector & ConnectionCollector::init(ContextMutablePtr global_context_, size_t max_threads)
{
if (connection_collector)
{
throw Exception("Connection collector is initialized twice. This is a bug.", ErrorCodes::LOGICAL_ERROR);
}
connection_collector.reset(new ConnectionCollector(global_context_, max_threads));
return *connection_collector;
}
struct AsyncDrainTask
{
const ConnectionPoolWithFailoverPtr pool;
std::shared_ptr<IConnections> shared_connections;
void operator()() const
{
ConnectionCollector::drainConnections(*shared_connections);
}
// We don't have std::unique_function yet. Wrap it in shared_ptr to make the functor copyable.
std::shared_ptr<CurrentMetrics::Increment> metric_increment
= std::make_shared<CurrentMetrics::Increment>(CurrentMetrics::ActiveAsyncDrainedConnections);
};
std::shared_ptr<IConnections> ConnectionCollector::enqueueConnectionCleanup(
const ConnectionPoolWithFailoverPtr & pool, std::unique_ptr<IConnections> connections) noexcept
{
if (!connections)
return nullptr;
std::shared_ptr<IConnections> shared_connections = std::move(connections);
if (connection_collector)
{
if (connection_collector->pool.trySchedule(AsyncDrainTask{pool, shared_connections}))
{
CurrentMetrics::add(CurrentMetrics::AsyncDrainedConnections, 1);
return nullptr;
}
}
return shared_connections;
}
void ConnectionCollector::drainConnections(IConnections & connections) noexcept
{
bool is_drained = false;
try
{
Packet packet = connections.drain();
is_drained = true;
switch (packet.type)
{
case Protocol::Server::EndOfStream:
case Protocol::Server::Log:
break;
case Protocol::Server::Exception:
packet.exception->rethrow();
break;
default:
throw Exception(
ErrorCodes::UNKNOWN_PACKET_FROM_SERVER,
"Unknown packet {} from one of the following replicas: {}",
toString(packet.type),
connections.dumpAddresses());
}
}
catch (...)
{
tryLogCurrentException(&Poco::Logger::get("ConnectionCollector"), __PRETTY_FUNCTION__);
if (!is_drained)
{
try
{
connections.disconnect();
}
catch (...)
{
tryLogCurrentException(&Poco::Logger::get("ConnectionCollector"), __PRETTY_FUNCTION__);
}
}
}
}
}

View File

@ -0,0 +1,30 @@
#pragma once
#include <Client/IConnections.h>
#include <Interpreters/Context_fwd.h>
#include <boost/noncopyable.hpp>
#include <Common/ThreadPool.h>
namespace DB
{
class ConnectionPoolWithFailover;
using ConnectionPoolWithFailoverPtr = std::shared_ptr<ConnectionPoolWithFailover>;
class ConnectionCollector : boost::noncopyable, WithMutableContext
{
public:
static ConnectionCollector & init(ContextMutablePtr global_context_, size_t max_threads);
static std::shared_ptr<IConnections>
enqueueConnectionCleanup(const ConnectionPoolWithFailoverPtr & pool, std::unique_ptr<IConnections> connections) noexcept;
static void drainConnections(IConnections & connections) noexcept;
private:
explicit ConnectionCollector(ContextMutablePtr global_context_, size_t max_threads);
static constexpr size_t reschedule_time_ms = 1000;
ThreadPool pool;
static std::unique_ptr<ConnectionCollector> connection_collector;
};
}

View File

@ -14,10 +14,11 @@ RemoteBlockInputStream::RemoteBlockInputStream(
}
RemoteBlockInputStream::RemoteBlockInputStream(
const ConnectionPoolWithFailoverPtr & pool,
std::vector<IConnectionPool::Entry> && connections,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: query_executor(std::move(connections), query_, header_, context_, throttler, scalars_, external_tables_, stage_)
: query_executor(pool, std::move(connections), query_, header_, context_, throttler, scalars_, external_tables_, stage_)
{
init();
}

View File

@ -31,6 +31,7 @@ public:
/// Accepts several connections already taken from pool.
RemoteBlockInputStream(
const ConnectionPoolWithFailoverPtr & pool,
std::vector<IConnectionPool::Entry> && connections,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),

View File

@ -1,3 +1,4 @@
#include <DataStreams/ConnectionCollector.h>
#include <DataStreams/RemoteQueryExecutor.h>
#include <DataStreams/RemoteQueryExecutorReadContext.h>
@ -17,6 +18,12 @@
#include <Client/HedgedConnections.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
namespace CurrentMetrics
{
extern const Metric SyncDrainedConnections;
extern const Metric ActiveSyncDrainedConnections;
}
namespace DB
{
@ -33,7 +40,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
ThrottlerPtr throttler, const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::shared_ptr<TaskIterator> task_iterator_)
: header(header_), query(query_), context(context_)
, scalars(scalars_), external_tables(external_tables_), stage(stage_), task_iterator(task_iterator_)
, scalars(scalars_), external_tables(external_tables_), stage(stage_), task_iterator(task_iterator_), sync_draining(true)
{
create_connections = [this, &connection, throttler]()
{
@ -42,12 +49,13 @@ RemoteQueryExecutor::RemoteQueryExecutor(
}
RemoteQueryExecutor::RemoteQueryExecutor(
const ConnectionPoolWithFailoverPtr & pool_,
std::vector<IConnectionPool::Entry> && connections_,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::shared_ptr<TaskIterator> task_iterator_)
: header(header_), query(query_), context(context_)
, scalars(scalars_), external_tables(external_tables_), stage(stage_), task_iterator(task_iterator_)
, scalars(scalars_), external_tables(external_tables_), stage(stage_), task_iterator(task_iterator_), pool(pool_)
{
create_connections = [this, connections_, throttler]() mutable {
return std::make_unique<MultiplexedConnections>(std::move(connections_), context->getSettingsRef(), throttler);
@ -55,14 +63,14 @@ RemoteQueryExecutor::RemoteQueryExecutor(
}
RemoteQueryExecutor::RemoteQueryExecutor(
const ConnectionPoolWithFailoverPtr & pool,
const ConnectionPoolWithFailoverPtr & pool_,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::shared_ptr<TaskIterator> task_iterator_)
: header(header_), query(query_), context(context_)
, scalars(scalars_), external_tables(external_tables_), stage(stage_), task_iterator(task_iterator_)
, scalars(scalars_), external_tables(external_tables_), stage(stage_), task_iterator(task_iterator_), pool(pool_)
{
create_connections = [this, pool, throttler]()->std::unique_ptr<IConnections>
create_connections = [this, throttler]()->std::unique_ptr<IConnections>
{
const Settings & current_settings = context->getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
@ -99,7 +107,7 @@ RemoteQueryExecutor::~RemoteQueryExecutor()
* all connections, then read and skip the remaining packets to make sure
* these connections did not remain hanging in the out-of-sync state.
*/
if (established || isQueryPending())
if (connections && (established || isQueryPending()))
connections->disconnect();
}
@ -406,33 +414,19 @@ void RemoteQueryExecutor::finish(std::unique_ptr<ReadContext> * read_context)
/// Send the request to abort the execution of the request, if not already sent.
tryCancel("Cancelling query because enough data has been read", read_context);
/// Get the remaining packets so that there is no out of sync in the connections to the replicas.
Packet packet = connections->drain();
switch (packet.type)
{
case Protocol::Server::EndOfStream:
finished = true;
break;
case Protocol::Server::Log:
/// Pass logs from remote server to client
if (auto log_queue = CurrentThread::getInternalTextLogsQueue())
log_queue->pushBlock(std::move(packet.block));
break;
case Protocol::Server::Exception:
got_exception_from_replica = true;
packet.exception->rethrow();
break;
default:
got_unknown_packet_from_replica = true;
throw Exception(ErrorCodes::UNKNOWN_PACKET_FROM_SERVER, "Unknown packet {} from one of the following replicas: {}",
toString(packet.type),
connections->dumpAddresses());
/// Finish might be called in multiple threads. Make sure we release connections in thread-safe way.
std::lock_guard guard(connection_draining_mutex);
if (auto conn = ConnectionCollector::enqueueConnectionCleanup(pool, std::move(connections)))
{
/// Drain connections synchronously.
CurrentMetrics::Increment metric_increment(CurrentMetrics::ActiveSyncDrainedConnections);
ConnectionCollector::drainConnections(*conn);
CurrentMetrics::add(CurrentMetrics::SyncDrainedConnections, 1);
}
}
finished = true;
}
void RemoteQueryExecutor::cancel(std::unique_ptr<ReadContext> * read_context)
{
@ -505,7 +499,6 @@ void RemoteQueryExecutor::sendExternalTables()
}
void RemoteQueryExecutor::tryCancel(const char * reason, std::unique_ptr<ReadContext> * read_context)
{
{
/// Flag was_cancelled is atomic because it is checked in read().
std::lock_guard guard(was_cancelled_mutex);
@ -519,7 +512,6 @@ void RemoteQueryExecutor::tryCancel(const char * reason, std::unique_ptr<ReadCon
(*read_context)->cancel();
connections->sendCancel();
}
if (log)
LOG_TRACE(log, "({}) {}", connections->dumpAddresses(), reason);

View File

@ -36,6 +36,7 @@ public:
using ReadContext = RemoteQueryExecutorReadContext;
/// Takes already set connection.
/// We don't own connection, thus we have to drain it synchronously.
RemoteQueryExecutor(
Connection & connection,
const String & query_, const Block & header_, ContextPtr context_,
@ -44,6 +45,7 @@ public:
/// Accepts several connections already taken from pool.
RemoteQueryExecutor(
const ConnectionPoolWithFailoverPtr & pool,
std::vector<IConnectionPool::Entry> && connections_,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
@ -107,9 +109,6 @@ private:
Block totals;
Block extremes;
std::function<std::unique_ptr<IConnections>()> create_connections;
std::unique_ptr<IConnections> connections;
const String query;
String query_id;
ContextPtr context;
@ -125,6 +124,15 @@ private:
/// Initiator identifier for distributed task processing
std::shared_ptr<TaskIterator> task_iterator;
/// Drain connection synchronously when finishing.
bool sync_draining = false;
std::function<std::unique_ptr<IConnections>()> create_connections;
/// Hold a shared reference to the connection pool so that asynchronous connection draining will
/// work safely. Make sure it's the first member so that we don't destruct it too early.
const ConnectionPoolWithFailoverPtr pool;
std::unique_ptr<IConnections> connections;
/// Streams for reading from temporary tables and following sending of data
/// to remote servers for GLOBAL-subqueries
std::vector<ExternalTablesData> external_tables_data;
@ -151,6 +159,10 @@ private:
std::atomic<bool> was_cancelled { false };
std::mutex was_cancelled_mutex;
/** Thread-safe connection draining.
*/
std::mutex connection_draining_mutex;
/** An exception from replica was received. No need in receiving more packets or
* requesting to cancel query execution
*/

View File

@ -43,7 +43,7 @@ struct RemoteQueryExecutorRoutine
{
while (true)
{
read_context.packet = connections.receivePacketUnlocked(ReadCallback{read_context, sink});
read_context.packet = connections.receivePacketUnlocked(ReadCallback{read_context, sink}, false /* is_draining */);
sink = std::move(sink).resume();
}
}
@ -144,7 +144,7 @@ bool RemoteQueryExecutorReadContext::checkTimeoutImpl(bool blocking)
if (is_timer_alarmed && !is_socket_ready)
{
/// Socket receive timeout. Drain it in case or error, or it may be hide by timeout exception.
/// Socket receive timeout. Drain it in case of error, or it may be hide by timeout exception.
timer.drain();
throw NetException("Timeout exceeded", ErrorCodes::SOCKET_TIMEOUT);
}

View File

@ -172,7 +172,7 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFacto
String query_string = formattedAST(query);
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
std::move(connections), query_string, header, context, throttler, scalars, external_tables, stage);
pool, std::move(connections), query_string, header, context, throttler, scalars, external_tables, stage);
return createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read);
}

View File

@ -1026,7 +1026,17 @@ bool TCPHandler::receivePacket()
return false;
case Protocol::Client::Cancel:
{
/// For testing connection collector.
const Settings & settings = query_context->getSettingsRef();
if (settings.sleep_in_receive_cancel_ms.totalMilliseconds())
{
std::chrono::milliseconds ms(settings.sleep_in_receive_cancel_ms.totalMilliseconds());
std::this_thread::sleep_for(ms);
}
return false;
}
case Protocol::Client::Hello:
receiveUnexpectedHello();
@ -1063,6 +1073,13 @@ String TCPHandler::receiveReadTaskResponseAssumeLocked()
if (packet_type == Protocol::Client::Cancel)
{
state.is_cancelled = true;
/// For testing connection collector.
const Settings & settings = query_context->getSettingsRef();
if (settings.sleep_in_receive_cancel_ms.totalMilliseconds())
{
std::chrono::milliseconds ms(settings.sleep_in_receive_cancel_ms.totalMilliseconds());
std::this_thread::sleep_for(ms);
}
return {};
}
else
@ -1461,6 +1478,16 @@ bool TCPHandler::isQueryCancelled()
throw NetException("Unexpected packet Cancel received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
LOG_INFO(log, "Query was cancelled.");
state.is_cancelled = true;
/// For testing connection collector.
{
const Settings & settings = query_context->getSettingsRef();
if (settings.sleep_in_receive_cancel_ms.totalMilliseconds())
{
std::chrono::milliseconds ms(settings.sleep_in_receive_cancel_ms.totalMilliseconds());
std::this_thread::sleep_for(ms);
}
}
return true;
default:

View File

@ -4,7 +4,7 @@
#include <Disks/IDisk.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <DataStreams/RemoteQueryExecutor.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeUUID.h>
@ -57,6 +57,7 @@
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/RemoteSource.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/NullSink.h>
@ -739,9 +740,10 @@ QueryPipelinePtr StorageDistributed::distributedWrite(const ASTInsertQuery & que
"Expected exactly one connection for shard " + toString(shard_info.shard_num), ErrorCodes::LOGICAL_ERROR);
/// INSERT SELECT query returns empty block
auto in_stream = std::make_shared<RemoteBlockInputStream>(std::move(connections), new_query_str, Block{}, local_context);
auto remote_query_executor
= std::make_shared<RemoteQueryExecutor>(shard_info.pool, std::move(connections), new_query_str, Block{}, local_context);
pipelines.emplace_back(std::make_unique<QueryPipeline>());
pipelines.back()->init(Pipe(std::make_shared<SourceFromInputStream>(std::move(in_stream))));
pipelines.back()->init(Pipe(std::make_shared<RemoteSource>(remote_query_executor, false, settings.async_socket_for_remote)));
pipelines.back()->setSinks([](const Block & header, QueryPipeline::StreamType) -> ProcessorPtr
{
return std::make_shared<EmptySink>(header);

View File

@ -126,8 +126,15 @@ Pipe StorageS3Cluster::read(
/// For unknown reason global context is passed to IStorage::read() method
/// So, task_identifier is passed as constructor argument. It is more obvious.
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
*connections.back(), queryToString(query_info.query), header, context,
/*throttler=*/nullptr, scalars, Tables(), processed_stage, callback);
*connections.back(),
queryToString(query_info.query),
header,
context,
/*throttler=*/nullptr,
scalars,
Tables(),
processed_stage,
callback);
pipes.emplace_back(std::make_shared<RemoteSource>(remote_query_executor, add_agg_info, false));
}

View File

@ -0,0 +1,4 @@
<?xml version="1.0"?>
<yandex>
<max_concurrent_queries>10000</max_concurrent_queries>
</yandex>

View File

@ -0,0 +1,36 @@
import os
import sys
import time
from multiprocessing.dummy import Pool
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance("node", main_configs=["configs/config.xml"])
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
node.query(
'create table t (number UInt64) engine = Distributed(test_cluster_two_shards, system, numbers);'
)
yield cluster
finally:
cluster.shutdown()
def test_filled_async_drain_connection_pool(started_cluster):
busy_pool = Pool(10)
def execute_query(i):
for _ in range(100):
node.query('select * from t where number = 0 limit 2;',
settings={
"sleep_in_receive_cancel_ms": 10000000,
"max_execution_time": 5
})
p = busy_pool.map(execute_query, range(10))

View File

@ -0,0 +1,2 @@
0
0

View File

@ -0,0 +1,6 @@
drop table if exists t;
create table t (number UInt64) engine = Distributed(test_cluster_two_shards, system, numbers);
select * from t where number = 0 limit 2 settings sleep_in_receive_cancel_ms = 10000, max_execution_time = 5;
drop table t;