Merge branch 'master' into disable-logical-expression-optimizer-with-aliases

This commit is contained in:
Alexey Milovidov 2023-03-13 21:41:55 +03:00 committed by GitHub
commit 741acf18c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 140 additions and 442 deletions

View File

@ -6,25 +6,26 @@ sidebar_label: clickhouse-local
# clickhouse-local
The `clickhouse-local` program enables you to perform fast processing on local files, without having to deploy and configure the ClickHouse server.
The `clickhouse-local` program enables you to perform fast processing on local files, without having to deploy and configure the ClickHouse server. It accepts data that represent tables and queries them using [ClickHouse SQL dialect](../../sql-reference/). `clickhouse-local` uses the same core as ClickHouse server, so it supports most of the features and the same set of formats and table engines.
Accepts data that represent tables and queries them using [ClickHouse SQL dialect](../../sql-reference/).
`clickhouse-local` uses the same core as ClickHouse server, so it supports most of the features and the same set of formats and table engines.
By default `clickhouse-local` does not have access to data on the same host, but it supports loading server configuration using `--config-file` argument.
For temporary data, a unique temporary data directory is created by default.
By default `clickhouse-local` has access to data on the same host, and it does not depend on the server's configuration. It also supports loading server configuration using `--config-file` argument. For temporary data, a unique temporary data directory is created by default.
## Usage {#usage}
Basic usage:
Basic usage (Linux):
``` bash
$ clickhouse-local --structure "table_structure" --input-format "format_of_incoming_data" \
--query "query"
$ clickhouse-local --structure "table_structure" --input-format "format_of_incoming_data" --query "query"
```
Basic usage (Mac):
``` bash
$ ./clickhouse local --structure "table_structure" --input-format "format_of_incoming_data" --query "query"
```
Also supported on Windows through WSL2.
Arguments:
- `-S`, `--structure` — table structure for input data.

View File

@ -1757,8 +1757,7 @@ void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskT
LOG_INFO(log, "All helping tables dropped partition {}", partition_name);
}
String ClusterCopier::getRemoteCreateTable(
const DatabaseAndTableName & table, Connection & connection, const Settings & settings)
String ClusterCopier::getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings & settings)
{
auto remote_context = Context::createCopy(context);
remote_context->setSettings(settings);
@ -1777,8 +1776,10 @@ ASTPtr ClusterCopier::getCreateTableForPullShard(const ConnectionTimeouts & time
{
/// Fetch and parse (possibly) new definition
auto connection_entry = task_shard.info.pool->get(timeouts, &task_cluster->settings_pull, true);
String create_query_pull_str
= getRemoteCreateTable(task_shard.task_table.table_pull, *connection_entry, task_cluster->settings_pull);
String create_query_pull_str = getRemoteCreateTable(
task_shard.task_table.table_pull,
*connection_entry,
task_cluster->settings_pull);
ParserCreateQuery parser_create_query;
const auto & settings = getContext()->getSettingsRef();
@ -2025,8 +2026,8 @@ UInt64 ClusterCopier::executeQueryOnCluster(
/// For unknown reason global context is passed to IStorage::read() method
/// So, task_identifier is passed as constructor argument. It is more obvious.
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
*connections.back(), query, header, getContext(),
/*throttler=*/nullptr, Scalars(), Tables(), QueryProcessingStage::Complete);
*connections.back(), query, header, getContext(),
/*throttler=*/nullptr, Scalars(), Tables(), QueryProcessingStage::Complete);
try
{

View File

@ -67,7 +67,6 @@
#include <TableFunctions/registerTableFunctions.h>
#include <Formats/registerFormats.h>
#include <Storages/registerStorages.h>
#include <QueryPipeline/ConnectionCollector.h>
#include <Dictionaries/registerDictionaries.h>
#include <Disks/registerDisks.h>
#include <IO/Resource/registerSchedulerNodes.h>
@ -816,8 +815,6 @@ try
}
);
ConnectionCollector::init(global_context, server_settings.max_threads_for_connection_collector);
bool has_zookeeper = config().has("zookeeper");
zkutil::ZooKeeperNodeCache main_config_zk_node_cache([&] { return global_context->getZooKeeper(); });

View File

@ -348,10 +348,6 @@
<background_distributed_schedule_pool_size>16</background_distributed_schedule_pool_size>
-->
<!-- Number of workers to recycle connections in background (see also drain_timeout).
If the pool is full, connection will be drained synchronously. -->
<!-- <max_threads_for_connection_collector>10</max_threads_for_connection_collector> -->
<!-- On memory constrained environments you may have to set this to value larger than 1.
-->
<max_server_memory_usage_to_ram_ratio>0.9</max_server_memory_usage_to_ram_ratio>

View File

@ -31,8 +31,6 @@ HedgedConnections::HedgedConnections(
: hedged_connections_factory(pool_, &context_->getSettingsRef(), timeouts_, table_to_check_)
, context(std::move(context_))
, settings(context->getSettingsRef())
, drain_timeout(settings.drain_timeout)
, allow_changing_replica_until_first_data_packet(settings.allow_changing_replica_until_first_data_packet)
, throttler(throttler_)
{
std::vector<Connection *> connections = hedged_connections_factory.getManyConnections(pool_mode);
@ -263,7 +261,7 @@ Packet HedgedConnections::drain()
while (!epoll.empty())
{
ReplicaLocation location = getReadyReplicaLocation(DrainCallback{drain_timeout});
ReplicaLocation location = getReadyReplicaLocation();
Packet packet = receivePacketFromReplica(location);
switch (packet.type)
{
@ -290,10 +288,10 @@ Packet HedgedConnections::drain()
Packet HedgedConnections::receivePacket()
{
std::lock_guard lock(cancel_mutex);
return receivePacketUnlocked({}, false /* is_draining */);
return receivePacketUnlocked({});
}
Packet HedgedConnections::receivePacketUnlocked(AsyncCallback async_callback, bool /* is_draining */)
Packet HedgedConnections::receivePacketUnlocked(AsyncCallback async_callback)
{
if (!sent_query)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot receive packets: no query sent.");
@ -413,7 +411,7 @@ Packet HedgedConnections::receivePacketFromReplica(const ReplicaLocation & repli
{
/// If we are allowed to change replica until the first data packet,
/// just restart timeout (if it hasn't expired yet). Otherwise disable changing replica with this offset.
if (allow_changing_replica_until_first_data_packet && !replica.is_change_replica_timeout_expired)
if (settings.allow_changing_replica_until_first_data_packet && !replica.is_change_replica_timeout_expired)
replica.change_replica_timeout.setRelative(hedged_connections_factory.getConnectionTimeouts().receive_data_timeout);
else
disableChangingReplica(replica_location);

View File

@ -101,7 +101,7 @@ public:
Packet receivePacket() override;
Packet receivePacketUnlocked(AsyncCallback async_callback, bool is_draining) override;
Packet receivePacketUnlocked(AsyncCallback async_callback) override;
void disconnect() override;
@ -196,12 +196,6 @@ private:
Epoll epoll;
ContextPtr context;
const Settings & settings;
/// The following two fields are from settings but can be referenced outside the lifetime of
/// settings when connection is drained asynchronously.
Poco::Timespan drain_timeout;
bool allow_changing_replica_until_first_data_packet;
ThrottlerPtr throttler;
bool sent_query = false;
bool cancelled = false;

View File

@ -1,36 +0,0 @@
#include <Client/IConnections.h>
#include <Poco/Net/SocketImpl.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SOCKET_TIMEOUT;
}
/// This wrapper struct allows us to use Poco's socket polling code with a raw fd.
/// The only difference from Poco::Net::SocketImpl is that we don't close the fd in the destructor.
struct PocoSocketWrapper : public Poco::Net::SocketImpl
{
explicit PocoSocketWrapper(int fd)
{
reset(fd);
}
// Do not close fd.
~PocoSocketWrapper() override { reset(-1); }
};
void IConnections::DrainCallback::operator()(int fd, Poco::Timespan, const std::string & fd_description) const
{
if (!PocoSocketWrapper(fd).poll(drain_timeout, Poco::Net::Socket::SELECT_READ))
{
throw Exception(ErrorCodes::SOCKET_TIMEOUT,
"Read timeout ({} ms) while draining from {}",
drain_timeout.totalMilliseconds(),
fd_description);
}
}
}

View File

@ -13,12 +13,6 @@ namespace DB
class IConnections : boost::noncopyable
{
public:
struct DrainCallback
{
Poco::Timespan drain_timeout;
void operator()(int fd, Poco::Timespan, const std::string & fd_description = "") const;
};
/// Send all scalars to replicas.
virtual void sendScalarsData(Scalars & data) = 0;
/// Send all content of external tables to replicas.
@ -40,7 +34,7 @@ public:
virtual Packet receivePacket() = 0;
/// Version of `receivePacket` function without locking.
virtual Packet receivePacketUnlocked(AsyncCallback async_callback, bool is_draining) = 0;
virtual Packet receivePacketUnlocked(AsyncCallback async_callback) = 0;
/// Break all active connections.
virtual void disconnect() = 0;

View File

@ -20,7 +20,7 @@ namespace ErrorCodes
MultiplexedConnections::MultiplexedConnections(Connection & connection, const Settings & settings_, const ThrottlerPtr & throttler)
: settings(settings_), drain_timeout(settings.drain_timeout), receive_timeout(settings.receive_timeout)
: settings(settings_)
{
connection.setThrottler(throttler);
@ -33,7 +33,7 @@ MultiplexedConnections::MultiplexedConnections(Connection & connection, const Se
MultiplexedConnections::MultiplexedConnections(std::shared_ptr<Connection> connection_ptr_, const Settings & settings_, const ThrottlerPtr & throttler)
: settings(settings_), drain_timeout(settings.drain_timeout), receive_timeout(settings.receive_timeout)
: settings(settings_)
, connection_ptr(connection_ptr_)
{
connection_ptr->setThrottler(throttler);
@ -46,8 +46,9 @@ MultiplexedConnections::MultiplexedConnections(std::shared_ptr<Connection> conne
}
MultiplexedConnections::MultiplexedConnections(
std::vector<IConnectionPool::Entry> && connections, const Settings & settings_, const ThrottlerPtr & throttler)
: settings(settings_), drain_timeout(settings.drain_timeout), receive_timeout(settings.receive_timeout)
std::vector<IConnectionPool::Entry> && connections,
const Settings & settings_, const ThrottlerPtr & throttler)
: settings(settings_)
{
/// If we didn't get any connections from pool and getMany() did not throw exceptions, this means that
/// `skip_unavailable_shards` was set. Then just return.
@ -206,7 +207,7 @@ void MultiplexedConnections::sendMergeTreeReadTaskResponse(const ParallelReadRes
Packet MultiplexedConnections::receivePacket()
{
std::lock_guard lock(cancel_mutex);
Packet packet = receivePacketUnlocked({}, false /* is_draining */);
Packet packet = receivePacketUnlocked({});
return packet;
}
@ -254,7 +255,7 @@ Packet MultiplexedConnections::drain()
while (hasActiveConnections())
{
Packet packet = receivePacketUnlocked(DrainCallback{drain_timeout}, true /* is_draining */);
Packet packet = receivePacketUnlocked({});
switch (packet.type)
{
@ -304,14 +305,14 @@ std::string MultiplexedConnections::dumpAddressesUnlocked() const
return buf.str();
}
Packet MultiplexedConnections::receivePacketUnlocked(AsyncCallback async_callback, bool is_draining)
Packet MultiplexedConnections::receivePacketUnlocked(AsyncCallback async_callback)
{
if (!sent_query)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot receive packets: no query sent.");
if (!hasActiveConnections())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No more packets are available.");
ReplicaState & state = getReplicaForReading(is_draining);
ReplicaState & state = getReplicaForReading();
current_connection = state.connection;
if (current_connection == nullptr)
throw Exception(ErrorCodes::NO_AVAILABLE_REPLICA, "Logical error: no available replica");
@ -366,10 +367,9 @@ Packet MultiplexedConnections::receivePacketUnlocked(AsyncCallback async_callbac
return packet;
}
MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForReading(bool is_draining)
MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForReading()
{
/// Fast path when we only focus on one replica and are not draining the connection.
if (replica_states.size() == 1 && !is_draining)
if (replica_states.size() == 1)
return replica_states[0];
Poco::Net::Socket::SocketList read_list;
@ -390,7 +390,7 @@ MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForRead
Poco::Net::Socket::SocketList write_list;
Poco::Net::Socket::SocketList except_list;
auto timeout = is_draining ? drain_timeout : receive_timeout;
auto timeout = settings.receive_timeout;
int n = 0;
/// EINTR loop
@ -417,9 +417,7 @@ MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForRead
break;
}
/// We treat any error as timeout for simplicity.
/// And we also check if read_list is still empty just in case.
if (n <= 0 || read_list.empty())
if (n == 0)
{
const auto & addresses = dumpAddressesUnlocked();
for (ReplicaState & state : replica_states)
@ -438,7 +436,9 @@ MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForRead
}
}
/// TODO Motivation of rand is unclear.
/// TODO Absolutely wrong code: read_list could be empty; motivation of rand is unclear.
/// This code path is disabled by default.
auto & socket = read_list[thread_local_rng() % read_list.size()];
if (fd_to_replica_state_idx.empty())
{

View File

@ -65,7 +65,7 @@ public:
void setReplicaInfo(ReplicaInfo value) override { replica_info = value; }
private:
Packet receivePacketUnlocked(AsyncCallback async_callback, bool is_draining) override;
Packet receivePacketUnlocked(AsyncCallback async_callback) override;
/// Internal version of `dumpAddresses` function without locking.
std::string dumpAddressesUnlocked() const;
@ -78,18 +78,13 @@ private:
};
/// Get a replica where you can read the data.
ReplicaState & getReplicaForReading(bool is_draining);
ReplicaState & getReplicaForReading();
/// Mark the replica as invalid.
void invalidateReplica(ReplicaState & replica_state);
const Settings & settings;
/// The following two fields are from settings but can be referenced outside the lifetime of
/// settings when connection is drained asynchronously.
Poco::Timespan drain_timeout;
Poco::Timespan receive_timeout;
/// The current number of valid connections to the replicas of this shard.
size_t active_connection_count = 0;

View File

@ -84,10 +84,6 @@
M(MMappedFileBytes, "Sum size of mmapped file regions.") \
M(MMappedAllocs, "Total number of mmapped allocations") \
M(MMappedAllocBytes, "Sum bytes of mmapped allocations") \
M(AsyncDrainedConnections, "Number of connections drained asynchronously.") \
M(ActiveAsyncDrainedConnections, "Number of active connections drained asynchronously.") \
M(SyncDrainedConnections, "Number of connections drained synchronously.") \
M(ActiveSyncDrainedConnections, "Number of active connections drained synchronously.") \
M(AsynchronousReadWait, "Number of threads waiting for asynchronous read.") \
M(PendingAsyncInsert, "Number of asynchronous inserts that are waiting for flush.") \
M(KafkaConsumers, "Number of active Kafka consumers") \

View File

@ -57,7 +57,6 @@ class IColumn;
M(Milliseconds, connect_timeout_with_failover_secure_ms, 100, "Connection timeout for selecting first healthy replica (for secure connections).", 0) \
M(Seconds, receive_timeout, DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, "Timeout for receiving data from network, in seconds. If no bytes were received in this interval, exception is thrown. If you set this setting on client, the 'send_timeout' for the socket will be also set on the corresponding connection end on the server.", 0) \
M(Seconds, send_timeout, DBMS_DEFAULT_SEND_TIMEOUT_SEC, "Timeout for sending data to network, in seconds. If client needs to sent some data, but it did not able to send any bytes in this interval, exception is thrown. If you set this setting on client, the 'receive_timeout' for the socket will be also set on the corresponding connection end on the server.", 0) \
M(Seconds, drain_timeout, 3, "Timeout for draining remote connections, -1 means synchronous drain without ignoring errors", 0) \
M(Seconds, tcp_keep_alive_timeout, 290 /* less than DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC */, "The time in seconds the connection needs to remain idle before TCP starts sending keepalive probes", 0) \
M(Milliseconds, hedged_connection_timeout_ms, 100, "Connection timeout for establishing connection with replica for Hedged requests", 0) \
M(Milliseconds, receive_data_timeout_ms, 2000, "Connection timeout for receiving first packet of data or packet with positive progress from replica", 0) \
@ -284,8 +283,6 @@ class IColumn;
M(Milliseconds, sleep_in_send_data_ms, 0, "Time to sleep in sending data in TCPHandler", 0) \
M(Milliseconds, sleep_after_receiving_query_ms, 0, "Time to sleep after receiving query in TCPHandler", 0) \
M(UInt64, unknown_packet_in_send_data, 0, "Send unknown packet instead of data Nth data packet", 0) \
/** Settings for testing connection collector */ \
M(Milliseconds, sleep_in_receive_cancel_ms, 0, "Time to sleep in receiving cancel in TCPHandler", 0) \
\
M(Bool, insert_allow_materialized_columns, false, "If setting is enabled, Allow materialized columns in INSERT.", 0) \
M(Seconds, http_connection_timeout, DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, "HTTP connection timeout.", 0) \
@ -759,7 +756,7 @@ class IColumn;
MAKE_OBSOLETE(M, Seconds, temporary_live_view_timeout, 1) \
MAKE_OBSOLETE(M, Milliseconds, async_insert_cleanup_timeout_ms, 1000) \
MAKE_OBSOLETE(M, Bool, optimize_fuse_sum_count_avg, 0) \
MAKE_OBSOLETE(M, Seconds, drain_timeout, 3) \
/** The section above is for obsolete settings. Do not add anything there. */

View File

@ -22,6 +22,8 @@
#include <filesystem>
#include <Common/logger_useful.h>
#define ORDINARY_TO_ATOMIC_PREFIX ".tmp_convert."
namespace fs = std::filesystem;
namespace DB
@ -117,6 +119,37 @@ static void checkUnsupportedVersion(ContextMutablePtr context, const String & da
"If so, you should upgrade through intermediate version.", database_name);
}
static void checkIncompleteOrdinaryToAtomicConversion(ContextPtr context, const std::map<String, String> & databases)
{
if (context->getConfigRef().has("allow_reserved_database_name_tmp_convert"))
return;
auto convert_flag_path = fs::path(context->getFlagsPath()) / "convert_ordinary_to_atomic";
if (!fs::exists(convert_flag_path))
return;
/// Flag exists. Let's check if we had an unsuccessful conversion attempt previously
for (const auto & db : databases)
{
if (!db.first.starts_with(ORDINARY_TO_ATOMIC_PREFIX))
continue;
size_t last_dot = db.first.rfind('.');
if (last_dot <= strlen(ORDINARY_TO_ATOMIC_PREFIX))
continue;
String actual_name = db.first.substr(strlen(ORDINARY_TO_ATOMIC_PREFIX), last_dot - strlen(ORDINARY_TO_ATOMIC_PREFIX));
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Found a database with special name: {}. "
"Most likely it indicates that conversion of database {} from Ordinary to Atomic "
"was interrupted or failed in the middle. You can add <allow_reserved_database_name_tmp_convert> to config.xml "
"or remove convert_ordinary_to_atomic file from flags/ directory, so the server will start forcefully. "
"After starting the server, you can finish conversion manually by moving rest of the tables from {} to {} "
"(using RENAME TABLE) and executing DROP DATABASE {} and RENAME DATABASE {} TO {}",
backQuote(db.first), backQuote(actual_name), backQuote(actual_name), backQuote(db.first),
backQuote(actual_name), backQuote(db.first), backQuote(actual_name));
}
}
void loadMetadata(ContextMutablePtr context, const String & default_database_name)
{
Poco::Logger * log = &Poco::Logger::get("loadMetadata");
@ -168,6 +201,8 @@ void loadMetadata(ContextMutablePtr context, const String & default_database_nam
}
}
checkIncompleteOrdinaryToAtomicConversion(context, databases);
/// clickhouse-local creates DatabaseMemory as default database by itself
/// For clickhouse-server we need create default database
bool create_default_db_if_not_exists = !default_database_name.empty();
@ -324,7 +359,7 @@ static void maybeConvertOrdinaryDatabaseToAtomic(ContextMutablePtr context, cons
database_name, fmt::join(permanently_detached_tables, ", "));
}
String tmp_name = fmt::format(".tmp_convert.{}.{}", database_name, thread_local_rng());
String tmp_name = fmt::format(ORDINARY_TO_ATOMIC_PREFIX"{}.{}", database_name, thread_local_rng());
try
{
@ -415,11 +450,13 @@ void convertDatabasesEnginesIfNeed(ContextMutablePtr context)
LOG_INFO(&Poco::Logger::get("loadMetadata"), "Found convert_ordinary_to_atomic file in flags directory, "
"will try to convert all Ordinary databases to Atomic");
fs::remove(convert_flag_path);
for (const auto & [name, _] : DatabaseCatalog::instance().getDatabases())
if (name != DatabaseCatalog::SYSTEM_DATABASE)
maybeConvertOrdinaryDatabaseToAtomic(context, name, /* tables_started */ true);
LOG_INFO(&Poco::Logger::get("loadMetadata"), "Conversion finished, removing convert_ordinary_to_atomic flag");
fs::remove(convert_flag_path);
}
void startupSystemTables()

View File

@ -202,7 +202,7 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream
scalars["_shard_num"]
= Block{{DataTypeUInt32().createColumnConst(1, shard.shard_info.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard.shard_info.pool, std::move(connections), query_string, header, context, throttler, scalars, external_tables, stage);
std::move(connections), query_string, header, context, throttler, scalars, external_tables, stage);
auto pipe = createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read);
QueryPipelineBuilder builder;

View File

@ -1,124 +0,0 @@
#include <QueryPipeline/ConnectionCollector.h>
#include <Core/BackgroundSchedulePool.h>
#include <Interpreters/Context.h>
#include <Common/Exception.h>
#include <Common/NetException.h>
#include "Core/Protocol.h"
#include <Common/logger_useful.h>
namespace CurrentMetrics
{
extern const Metric AsyncDrainedConnections;
extern const Metric ActiveAsyncDrainedConnections;
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNEXPECTED_PACKET_FROM_SERVER;
}
std::unique_ptr<ConnectionCollector> ConnectionCollector::connection_collector;
static constexpr UInt64 max_connection_draining_tasks_per_thread = 20;
ConnectionCollector::ConnectionCollector(ContextMutablePtr global_context_, size_t max_threads)
: WithMutableContext(global_context_), pool(max_threads, max_threads, max_threads * max_connection_draining_tasks_per_thread)
{
}
ConnectionCollector & ConnectionCollector::init(ContextMutablePtr global_context_, size_t max_threads)
{
if (connection_collector)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Connection collector is initialized twice. This is a bug");
}
connection_collector.reset(new ConnectionCollector(global_context_, max_threads));
return *connection_collector;
}
struct AsyncDrainTask
{
const ConnectionPoolWithFailoverPtr pool;
std::shared_ptr<IConnections> shared_connections;
void operator()() const
{
ConnectionCollector::drainConnections(*shared_connections, /* throw_error= */ false);
}
// We don't have std::unique_function yet. Wrap it in shared_ptr to make the functor copyable.
std::shared_ptr<CurrentMetrics::Increment> metric_increment
= std::make_shared<CurrentMetrics::Increment>(CurrentMetrics::ActiveAsyncDrainedConnections);
};
std::shared_ptr<IConnections> ConnectionCollector::enqueueConnectionCleanup(
const ConnectionPoolWithFailoverPtr & pool, std::shared_ptr<IConnections> connections) noexcept
{
if (!connections)
return nullptr;
if (connection_collector)
{
if (connection_collector->pool.trySchedule(AsyncDrainTask{pool, connections}))
{
CurrentMetrics::add(CurrentMetrics::AsyncDrainedConnections, 1);
return nullptr;
}
}
return connections;
}
void ConnectionCollector::drainConnections(IConnections & connections, bool throw_error)
{
bool is_drained = false;
try
{
Packet packet = connections.drain();
is_drained = true;
switch (packet.type)
{
case Protocol::Server::EndOfStream:
case Protocol::Server::Log:
case Protocol::Server::ProfileEvents:
break;
case Protocol::Server::Exception:
packet.exception->rethrow();
break;
default:
/// Connection should be closed in case of unexpected packet,
/// since this means that the connection in some bad state.
is_drained = false;
throw NetException(
ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER,
"Unexpected packet {} from one of the following replicas: {}. (expected EndOfStream, Log, ProfileEvents or Exception)",
Protocol::Server::toString(packet.type),
connections.dumpAddresses());
}
}
catch (...)
{
tryLogCurrentException(&Poco::Logger::get("ConnectionCollector"), __PRETTY_FUNCTION__);
if (!is_drained)
{
try
{
connections.disconnect();
}
catch (...)
{
tryLogCurrentException(&Poco::Logger::get("ConnectionCollector"), __PRETTY_FUNCTION__);
}
}
if (throw_error)
throw;
}
}
}

View File

@ -1,30 +0,0 @@
#pragma once
#include <Client/IConnections.h>
#include <Interpreters/Context_fwd.h>
#include <boost/noncopyable.hpp>
#include <Common/ThreadPool.h>
namespace DB
{
class ConnectionPoolWithFailover;
using ConnectionPoolWithFailoverPtr = std::shared_ptr<ConnectionPoolWithFailover>;
class ConnectionCollector : boost::noncopyable, WithMutableContext
{
public:
static ConnectionCollector & init(ContextMutablePtr global_context_, size_t max_threads);
static std::shared_ptr<IConnections>
enqueueConnectionCleanup(const ConnectionPoolWithFailoverPtr & pool, std::shared_ptr<IConnections> connections) noexcept;
static void drainConnections(IConnections & connections, bool throw_error);
private:
explicit ConnectionCollector(ContextMutablePtr global_context_, size_t max_threads);
static constexpr size_t reschedule_time_ms = 1000;
ThreadPool pool;
static std::unique_ptr<ConnectionCollector> connection_collector;
};
}

View File

@ -1,6 +1,4 @@
#include <Common/ConcurrentBoundedQueue.h>
#include <QueryPipeline/ConnectionCollector.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <QueryPipeline/RemoteQueryExecutorReadContext.h>
@ -25,12 +23,6 @@
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
namespace CurrentMetrics
{
extern const Metric SyncDrainedConnections;
extern const Metric ActiveSyncDrainedConnections;
}
namespace ProfileEvents
{
extern const Event ReadTaskRequestsReceived;
@ -67,7 +59,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
{
create_connections = [this, &connection, throttler, extension_]()
{
auto res = std::make_shared<MultiplexedConnections>(connection, context->getSettingsRef(), throttler);
auto res = std::make_unique<MultiplexedConnections>(connection, context->getSettingsRef(), throttler);
if (extension_ && extension_->replica_info)
res->setReplicaInfo(*extension_->replica_info);
return res;
@ -83,7 +75,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
{
create_connections = [this, connection_ptr, throttler, extension_]()
{
auto res = std::make_shared<MultiplexedConnections>(connection_ptr, context->getSettingsRef(), throttler);
auto res = std::make_unique<MultiplexedConnections>(connection_ptr, context->getSettingsRef(), throttler);
if (extension_ && extension_->replica_info)
res->setReplicaInfo(*extension_->replica_info);
return res;
@ -91,7 +83,6 @@ RemoteQueryExecutor::RemoteQueryExecutor(
}
RemoteQueryExecutor::RemoteQueryExecutor(
const ConnectionPoolWithFailoverPtr & pool_,
std::vector<IConnectionPool::Entry> && connections_,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_,
@ -100,10 +91,9 @@ RemoteQueryExecutor::RemoteQueryExecutor(
, scalars(scalars_), external_tables(external_tables_), stage(stage_)
, task_iterator(extension_ ? extension_->task_iterator : nullptr)
, parallel_reading_coordinator(extension_ ? extension_->parallel_reading_coordinator : nullptr)
, pool(pool_)
{
create_connections = [this, connections_, throttler, extension_]() mutable {
auto res = std::make_shared<MultiplexedConnections>(std::move(connections_), context->getSettingsRef(), throttler);
auto res = std::make_unique<MultiplexedConnections>(std::move(connections_), context->getSettingsRef(), throttler);
if (extension_ && extension_->replica_info)
res->setReplicaInfo(*extension_->replica_info);
return res;
@ -111,7 +101,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
}
RemoteQueryExecutor::RemoteQueryExecutor(
const ConnectionPoolWithFailoverPtr & pool_,
const ConnectionPoolWithFailoverPtr & pool,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
@ -119,9 +109,8 @@ RemoteQueryExecutor::RemoteQueryExecutor(
, scalars(scalars_), external_tables(external_tables_), stage(stage_)
, task_iterator(extension_ ? extension_->task_iterator : nullptr)
, parallel_reading_coordinator(extension_ ? extension_->parallel_reading_coordinator : nullptr)
, pool(pool_)
{
create_connections = [this, throttler, extension_]()->std::shared_ptr<IConnections>
create_connections = [this, pool, throttler, extension_]()->std::unique_ptr<IConnections>
{
const Settings & current_settings = context->getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
@ -133,7 +122,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
if (main_table)
table_to_check = std::make_shared<QualifiedTableName>(main_table.getQualifiedName());
auto res = std::make_shared<HedgedConnections>(pool, context, timeouts, throttler, pool_mode, table_to_check);
auto res = std::make_unique<HedgedConnections>(pool, context, timeouts, throttler, pool_mode, table_to_check);
if (extension_ && extension_->replica_info)
res->setReplicaInfo(*extension_->replica_info);
return res;
@ -151,7 +140,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
else
connection_entries = pool->getMany(timeouts, &current_settings, pool_mode);
auto res = std::make_shared<MultiplexedConnections>(std::move(connection_entries), current_settings, throttler);
auto res = std::make_unique<MultiplexedConnections>(std::move(connection_entries), current_settings, throttler);
if (extension_ && extension_->replica_info)
res->setReplicaInfo(*extension_->replica_info);
return res;
@ -535,26 +524,38 @@ void RemoteQueryExecutor::finish(std::unique_ptr<ReadContext> * read_context)
/// Send the request to abort the execution of the request, if not already sent.
tryCancel("Cancelling query because enough data has been read", read_context);
if (context->getSettingsRef().drain_timeout != Poco::Timespan(-1000000))
/// Get the remaining packets so that there is no out of sync in the connections to the replicas.
Packet packet = connections->drain();
switch (packet.type)
{
auto connections_left = ConnectionCollector::enqueueConnectionCleanup(pool, connections);
if (connections_left)
{
/// Drain connections synchronously and suppress errors.
CurrentMetrics::Increment metric_increment(CurrentMetrics::ActiveSyncDrainedConnections);
ConnectionCollector::drainConnections(*connections_left, /* throw_error= */ false);
CurrentMetrics::add(CurrentMetrics::SyncDrainedConnections, 1);
}
}
else
{
/// Drain connections synchronously without suppressing errors.
CurrentMetrics::Increment metric_increment(CurrentMetrics::ActiveSyncDrainedConnections);
ConnectionCollector::drainConnections(*connections, /* throw_error= */ true);
CurrentMetrics::add(CurrentMetrics::SyncDrainedConnections, 1);
}
case Protocol::Server::EndOfStream:
finished = true;
break;
finished = true;
case Protocol::Server::Log:
/// Pass logs from remote server to client
if (auto log_queue = CurrentThread::getInternalTextLogsQueue())
log_queue->pushBlock(std::move(packet.block));
break;
case Protocol::Server::Exception:
got_exception_from_replica = true;
packet.exception->rethrow();
break;
case Protocol::Server::ProfileEvents:
/// Pass profile events from remote server to client
if (auto profile_queue = CurrentThread::getInternalProfileEventsQueue())
if (!profile_queue->emplace(std::move(packet.block)))
throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push into profile queue");
break;
default:
got_unknown_packet_from_replica = true;
throw Exception(ErrorCodes::UNKNOWN_PACKET_FROM_SERVER, "Unknown packet {} from one of the following replicas: {}",
toString(packet.type),
connections->dumpAddresses());
}
}
void RemoteQueryExecutor::cancel(std::unique_ptr<ReadContext> * read_context)

View File

@ -52,7 +52,6 @@ public:
};
/// Takes already set connection.
/// We don't own connection, thus we have to drain it synchronously.
RemoteQueryExecutor(
Connection & connection,
const String & query_, const Block & header_, ContextPtr context_,
@ -68,7 +67,6 @@ public:
/// Accepts several connections already taken from pool.
RemoteQueryExecutor(
const ConnectionPoolWithFailoverPtr & pool,
std::vector<IConnectionPool::Entry> && connections_,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
@ -191,6 +189,9 @@ private:
Block totals;
Block extremes;
std::function<std::unique_ptr<IConnections>()> create_connections;
std::unique_ptr<IConnections> connections;
const String query;
String query_id;
ContextPtr context;
@ -213,12 +214,6 @@ private:
/// about the number of the current replica or the count of replicas at all.
IConnections::ReplicaInfo replica_info;
std::function<std::shared_ptr<IConnections>()> create_connections;
/// Hold a shared reference to the connection pool so that asynchronous connection draining will
/// work safely. Make sure it's the first member so that we don't destruct it too early.
const ConnectionPoolWithFailoverPtr pool;
std::shared_ptr<IConnections> connections;
/// Streams for reading from temporary tables and following sending of data
/// to remote servers for GLOBAL-subqueries
std::vector<ExternalTablesData> external_tables_data;

View File

@ -44,7 +44,7 @@ struct RemoteQueryExecutorRoutine
{
while (true)
{
read_context.packet = connections.receivePacketUnlocked(ReadCallback{read_context, sink}, false /* is_draining */);
read_context.packet = connections.receivePacketUnlocked(ReadCallback{read_context, sink});
sink = std::move(sink).resume();
}
}
@ -147,7 +147,7 @@ bool RemoteQueryExecutorReadContext::checkTimeoutImpl(bool blocking)
if (is_timer_alarmed && !is_socket_ready)
{
/// Socket receive timeout. Drain it in case of error, or it may be hide by timeout exception.
/// Socket receive timeout. Drain it in case or error, or it may be hide by timeout exception.
timer.drain();
throw NetException(ErrorCodes::SOCKET_TIMEOUT, "Timeout exceeded");
}

View File

@ -643,7 +643,6 @@ void TCPHandler::extractConnectionSettingsFromContext(const ContextPtr & context
interactive_delay = settings.interactive_delay;
sleep_in_send_tables_status = settings.sleep_in_send_tables_status_ms;
unknown_packet_in_send_data = settings.unknown_packet_in_send_data;
sleep_in_receive_cancel = settings.sleep_in_receive_cancel_ms;
sleep_after_receiving_query = settings.sleep_after_receiving_query_ms;
}
@ -1318,19 +1317,9 @@ bool TCPHandler::receivePacket()
return false;
case Protocol::Client::Cancel:
{
/// For testing connection collector.
if (unlikely(sleep_in_receive_cancel.totalMilliseconds()))
{
std::chrono::milliseconds ms(sleep_in_receive_cancel.totalMilliseconds());
std::this_thread::sleep_for(ms);
}
LOG_INFO(log, "Received 'Cancel' packet from the client, canceling the query");
state.is_cancelled = true;
return false;
}
case Protocol::Client::Hello:
receiveUnexpectedHello();
@ -1372,12 +1361,6 @@ String TCPHandler::receiveReadTaskResponseAssumeLocked()
{
LOG_INFO(log, "Received 'Cancel' packet from the client, canceling the read task");
state.is_cancelled = true;
/// For testing connection collector.
if (unlikely(sleep_in_receive_cancel.totalMilliseconds()))
{
std::chrono::milliseconds ms(sleep_in_receive_cancel.totalMilliseconds());
std::this_thread::sleep_for(ms);
}
return {};
}
else
@ -1406,12 +1389,6 @@ std::optional<ParallelReadResponse> TCPHandler::receivePartitionMergeTreeReadTas
{
LOG_INFO(log, "Received 'Cancel' packet from the client, canceling the MergeTree read task");
state.is_cancelled = true;
/// For testing connection collector.
if (unlikely(sleep_in_receive_cancel.totalMilliseconds()))
{
std::chrono::milliseconds ms(sleep_in_receive_cancel.totalMilliseconds());
std::this_thread::sleep_for(ms);
}
return std::nullopt;
}
else
@ -1822,15 +1799,6 @@ bool TCPHandler::isQueryCancelled()
throw NetException(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected packet Cancel received from client");
LOG_INFO(log, "Query was cancelled.");
state.is_cancelled = true;
/// For testing connection collector.
{
if (unlikely(sleep_in_receive_cancel.totalMilliseconds()))
{
std::chrono::milliseconds ms(sleep_in_receive_cancel.totalMilliseconds());
std::this_thread::sleep_for(ms);
}
}
return true;
default:

View File

@ -170,7 +170,6 @@ private:
UInt64 interactive_delay = 100000;
Poco::Timespan sleep_in_send_tables_status;
UInt64 unknown_packet_in_send_data = 0;
Poco::Timespan sleep_in_receive_cancel;
Poco::Timespan sleep_after_receiving_query;
std::unique_ptr<Session> session;

View File

@ -110,7 +110,6 @@ Pipe StorageHDFSCluster::read(
for (auto & try_result : try_results)
{
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard_info.pool,
std::vector<IConnectionPool::Entry>{try_result},
queryToString(query_to_send),
header,

View File

@ -1262,7 +1262,7 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteBetweenDistribu
/// INSERT SELECT query returns empty block
auto remote_query_executor
= std::make_shared<RemoteQueryExecutor>(shard_info.pool, std::move(connections), new_query_str, Block{}, query_context);
= std::make_shared<RemoteQueryExecutor>(std::move(connections), new_query_str, Block{}, query_context);
QueryPipeline remote_pipeline(std::make_shared<RemoteSource>(remote_query_executor, false, settings.async_socket_for_remote));
remote_pipeline.complete(std::make_shared<EmptySink>(remote_query_executor->getHeader()));

View File

@ -146,7 +146,6 @@ Pipe StorageS3Cluster::read(
for (auto & try_result : try_results)
{
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard_info.pool,
std::vector<IConnectionPool::Entry>{try_result},
queryToString(query_to_send),
sample_block,

View File

@ -1,3 +0,0 @@
<clickhouse>
<max_concurrent_queries>10000</max_concurrent_queries>
</clickhouse>

View File

@ -1,41 +0,0 @@
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance("node", main_configs=["configs/config.xml"])
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
node.query(
"""
create table t (number UInt64)
engine = Distributed(test_cluster_two_shards, system, numbers)
"""
)
yield cluster
finally:
cluster.shutdown()
def test_filled_async_drain_connection_pool(started_cluster):
def execute_queries(_):
for _ in range(100):
node.query(
"select * from t where number = 0 limit 2",
settings={
"sleep_in_receive_cancel_ms": int(10e6),
"max_execution_time": 5,
# decrease drain_timeout to make test more stable
# (another way is to increase max_execution_time, but this will make test slower)
"drain_timeout": 1,
},
)
any(map(execute_queries, range(10)))

View File

@ -188,7 +188,18 @@ def check_convert_all_dbs_to_atomic():
node.exec_in_container(
["bash", "-c", f"touch /var/lib/clickhouse/flags/convert_ordinary_to_atomic"]
)
node.restart_clickhouse()
node.stop_clickhouse()
cannot_start = False
try:
node.start_clickhouse()
except:
cannot_start = True
assert cannot_start
node.exec_in_container(
["bash", "-c", f"rm /var/lib/clickhouse/flags/convert_ordinary_to_atomic"]
)
node.start_clickhouse()
assert "Ordinary\n" == node.query(
"SELECT engine FROM system.databases where name='ordinary'"

View File

@ -1,6 +0,0 @@
drop table if exists t;
create table t (number UInt64) engine = Distributed(test_cluster_two_shards, system, numbers);
select * from t where number = 0 limit 2 settings sleep_in_receive_cancel_ms = 10000, max_execution_time = 5;
drop table t;

View File

@ -1,2 +0,0 @@
OK: sync drain
OK: async drain

View File

@ -1,30 +0,0 @@
#!/usr/bin/env bash
# Tags: no-parallel
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# sync drain
for _ in {1..100}; do
prev=$(curl -d@- -sS "${CLICKHOUSE_URL}" <<<"select value from system.metrics where metric = 'SyncDrainedConnections'")
curl -d@- -sS "${CLICKHOUSE_URL}" <<<"select * from remote('127.{2,3}', view(select * from numbers(1e6))) limit 100 settings drain_timeout=-1 format Null"
now=$(curl -d@- -sS "${CLICKHOUSE_URL}" <<<"select value from system.metrics where metric = 'SyncDrainedConnections'")
if [[ "$prev" != $(( now-2 )) ]]; then
continue
fi
echo "OK: sync drain"
break
done
# async drain
for _ in {1..100}; do
prev=$(curl -d@- -sS "${CLICKHOUSE_URL}" <<<"select value from system.metrics where metric = 'AsyncDrainedConnections'")
curl -d@- -sS "${CLICKHOUSE_URL}" <<<"select * from remote('127.{2,3}', view(select * from numbers(1e6))) limit 100 settings drain_timeout=10 format Null"
now=$(curl -d@- -sS "${CLICKHOUSE_URL}" <<<"select value from system.metrics where metric = 'AsyncDrainedConnections'")
if [[ "$prev" != $(( now-2 )) ]]; then
continue
fi
echo "OK: async drain"
break
done

View File

@ -12,10 +12,4 @@ settings
-- This is to activate as much signals as possible to trigger EINTR
query_profiler_real_time_period_ns=1,
-- This is to use MultiplexedConnections
use_hedged_requests=0,
-- This is to make the initiator waiting for cancel packet in MultiplexedConnections::getReplicaForReading()
--
-- NOTE: that even smaller sleep will be enough to trigger this problem
-- with 100% probability, however just to make it more reliable, increase
-- it to 2 seconds.
sleep_in_receive_cancel_ms=2000;
use_hedged_requests=0;