Merge pull request #4558 from podshumok/distributred-timeout

Distributed and remote timeouts
This commit is contained in:
alesapin 2019-06-17 13:58:17 +03:00 committed by GitHub
commit e45d727e9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 428 additions and 169 deletions

View File

@ -54,10 +54,10 @@ public:
const String & host_, UInt16 port_, bool secure_, const String & default_database_,
const String & user_, const String & password_, const String & stage,
bool randomize_, size_t max_iterations_, double max_time_,
const String & json_path_, const ConnectionTimeouts & timeouts, const Settings & settings_)
const String & json_path_, const Settings & settings_)
:
concurrency(concurrency_), delay(delay_), queue(concurrency),
connections(concurrency, host_, port_, default_database_, user_, password_, timeouts, "benchmark", Protocol::Compression::Enable, secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable),
connections(concurrency, host_, port_, default_database_, user_, password_, "benchmark", Protocol::Compression::Enable, secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable),
randomize(randomize_), max_iterations(max_iterations_), max_time(max_time_),
json_path(json_path_), settings(settings_), global_context(Context::createGlobal()), pool(concurrency)
{
@ -240,7 +240,8 @@ private:
std::uniform_int_distribution<size_t> distribution(0, queries.size() - 1);
for (size_t i = 0; i < concurrency; ++i)
pool.schedule(std::bind(&Benchmark::thread, this, connections.get()));
pool.schedule(std::bind(&Benchmark::thread, this,
connections.get(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings))));
InterruptListener interrupt_listener;
info_per_interval.watch.restart();
@ -310,7 +311,9 @@ private:
void execute(ConnectionPool::Entry & connection, Query & query)
{
Stopwatch watch;
RemoteBlockInputStream stream(*connection, query, {}, global_context, &settings, nullptr, Tables(), query_processing_stage);
RemoteBlockInputStream stream(
*connection,
query, {}, global_context, &settings, nullptr, Tables(), query_processing_stage);
Progress progress;
stream.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); });
@ -485,7 +488,6 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
options["iterations"].as<size_t>(),
options["timelimit"].as<double>(),
options["json"].as<std::string>(),
ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings),
settings);
return benchmark.run();
}

View File

@ -204,7 +204,6 @@ private:
ConnectionParameters connection_parameters;
void initialize(Poco::Util::Application & self)
{
Poco::Util::Application::initialize(self);
@ -337,7 +336,7 @@ private:
DateLUT::instance();
if (!context.getSettingsRef().use_client_time_zone)
{
const auto & time_zone = connection->getServerTimezone();
const auto & time_zone = connection->getServerTimezone(connection_parameters.timeouts);
if (!time_zone.empty())
{
try
@ -521,7 +520,6 @@ private:
connection_parameters.default_database,
connection_parameters.user,
connection_parameters.password,
connection_parameters.timeouts,
"client",
connection_parameters.compression,
connection_parameters.security);
@ -537,11 +535,14 @@ private:
connection->setThrottler(throttler);
}
connection->getServerVersion(server_name, server_version_major, server_version_minor, server_version_patch, server_revision);
connection->getServerVersion(connection_parameters.timeouts,
server_name, server_version_major, server_version_minor, server_version_patch, server_revision);
server_version = toString(server_version_major) + "." + toString(server_version_minor) + "." + toString(server_version_patch);
if (server_display_name = connection->getServerDisplayName(); server_display_name.length() == 0)
if (
server_display_name = connection->getServerDisplayName(connection_parameters.timeouts);
server_display_name.length() == 0)
{
server_display_name = config().getString("host", "localhost");
}
@ -752,7 +753,7 @@ private:
}
if (!test_hint.checkActual(actual_server_error, actual_client_error, got_exception, last_exception))
connection->forceConnected();
connection->forceConnected(connection_parameters.timeouts);
if (got_exception && !ignore_error)
{
@ -828,7 +829,7 @@ private:
if (with_output && with_output->settings_ast)
apply_query_settings(*with_output->settings_ast);
connection->forceConnected();
connection->forceConnected(connection_parameters.timeouts);
/// INSERT query for which data transfer is needed (not an INSERT SELECT) is processed separately.
if (insert && !insert->select)
@ -899,7 +900,7 @@ private:
/// Process the query that doesn't require transferring data blocks to the server.
void processOrdinaryQuery()
{
connection->sendQuery(query, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true);
connection->sendQuery(connection_parameters.timeouts, query, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true);
sendExternalTables();
receiveResult();
}
@ -917,7 +918,7 @@ private:
if (!parsed_insert_query.data && (is_interactive || (stdin_is_not_tty && std_in.eof())))
throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT);
connection->sendQuery(query_without_data, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true);
connection->sendQuery(connection_parameters.timeouts, query_without_data, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true);
sendExternalTables();
/// Receive description of table structure.
@ -1064,7 +1065,7 @@ private:
bool cancelled = false;
// TODO: get the poll_interval from commandline.
const auto receive_timeout = connection->getTimeouts().receive_timeout;
const auto receive_timeout = connection_parameters.timeouts.receive_timeout;
constexpr size_t default_poll_interval = 1000000; /// in microseconds
constexpr size_t min_poll_interval = 5000; /// in microseconds
const size_t poll_interval

View File

@ -14,6 +14,7 @@
#include <Common/typeid_cast.h>
#include <Columns/ColumnString.h>
#include <Client/Connection.h>
#include <IO/ConnectionTimeouts.h>
namespace DB
@ -42,7 +43,7 @@ private:
"KILL", "QUERY", "SYNC", "ASYNC", "TEST", "BETWEEN", "TRUNCATE"
};
/// Words are fetched asynchonously.
/// Words are fetched asynchronously.
std::thread loading_thread;
std::atomic<bool> ready{false};
@ -71,7 +72,7 @@ private:
return word;
}
void loadImpl(Connection & connection, size_t suggestion_limit)
void loadImpl(Connection & connection, const ConnectionTimeouts & timeouts, size_t suggestion_limit)
{
std::stringstream query;
query << "SELECT DISTINCT arrayJoin(extractAll(name, '[\\\\w_]{2,}')) AS res FROM ("
@ -104,12 +105,12 @@ private:
query << ") WHERE notEmpty(res)";
fetch(connection, query.str());
fetch(connection, timeouts, query.str());
}
void fetch(Connection & connection, const std::string & query)
void fetch(Connection & connection, const ConnectionTimeouts & timeouts, const std::string & query)
{
connection.sendQuery(query);
connection.sendQuery(timeouts, query);
while (true)
{
@ -175,12 +176,11 @@ public:
connection_parameters.default_database,
connection_parameters.user,
connection_parameters.password,
connection_parameters.timeouts,
"client",
connection_parameters.compression,
connection_parameters.security);
loadImpl(connection, suggestion_limit);
loadImpl(connection, connection_parameters.timeouts, suggestion_limit);
}
catch (...)
{

View File

@ -54,6 +54,7 @@
#include <DataStreams/AsynchronousBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <DataStreams/NullBlockOutputStream.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadBufferFromFile.h>
@ -798,13 +799,13 @@ public:
}
void discoverShardPartitions(const TaskShardPtr & task_shard)
void discoverShardPartitions(const ConnectionTimeouts & timeouts, const TaskShardPtr & task_shard)
{
TaskTable & task_table = task_shard->task_table;
LOG_INFO(log, "Discover partitions of shard " << task_shard->getDescription());
auto get_partitions = [&] () { return getShardPartitions(*task_shard); };
auto get_partitions = [&] () { return getShardPartitions(timeouts, *task_shard); };
auto existing_partitions_names = retry(get_partitions, 60);
Strings filtered_partitions_names;
Strings missing_partitions;
@ -880,14 +881,14 @@ public:
}
/// Compute set of partitions, assume set of partitions aren't changed during the processing
void discoverTablePartitions(TaskTable & task_table, UInt64 num_threads = 0)
void discoverTablePartitions(const ConnectionTimeouts & timeouts, TaskTable & task_table, UInt64 num_threads = 0)
{
/// Fetch partitions list from a shard
{
ThreadPool thread_pool(num_threads ? num_threads : 2 * getNumberOfPhysicalCPUCores());
for (const TaskShardPtr & task_shard : task_table.all_shards)
thread_pool.schedule([this, task_shard]() { discoverShardPartitions(task_shard); });
thread_pool.schedule([this, timeouts, task_shard]() { discoverShardPartitions(timeouts, task_shard); });
LOG_DEBUG(log, "Waiting for " << thread_pool.active() << " setup jobs");
thread_pool.wait();
@ -955,7 +956,7 @@ public:
task_descprtion_current_version = version_to_update;
}
void process()
void process(const ConnectionTimeouts & timeouts)
{
for (TaskTable & task_table : task_cluster->table_tasks)
{
@ -969,7 +970,7 @@ public:
if (!task_table.has_enabled_partitions)
{
/// If there are no specified enabled_partitions, we must discover them manually
discoverTablePartitions(task_table);
discoverTablePartitions(timeouts, task_table);
/// After partitions of each shard are initialized, initialize cluster partitions
for (const TaskShardPtr & task_shard : task_table.all_shards)
@ -1009,7 +1010,7 @@ public:
bool table_is_done = false;
for (UInt64 num_table_tries = 0; num_table_tries < max_table_tries; ++num_table_tries)
{
if (tryProcessTable(task_table))
if (tryProcessTable(timeouts, task_table))
{
table_is_done = true;
break;
@ -1053,8 +1054,10 @@ protected:
return getWorkersPath() + "/" + host_id;
}
zkutil::EphemeralNodeHolder::Ptr createTaskWorkerNodeAndWaitIfNeed(const zkutil::ZooKeeperPtr & zookeeper,
const String & description, bool unprioritized)
zkutil::EphemeralNodeHolder::Ptr createTaskWorkerNodeAndWaitIfNeed(
const zkutil::ZooKeeperPtr & zookeeper,
const String & description,
bool unprioritized)
{
std::chrono::milliseconds current_sleep_time = default_sleep_time;
static constexpr std::chrono::milliseconds max_sleep_time(30000); // 30 sec
@ -1329,7 +1332,7 @@ protected:
static constexpr UInt64 max_table_tries = 1000;
static constexpr UInt64 max_shard_partition_tries = 600;
bool tryProcessTable(TaskTable & task_table)
bool tryProcessTable(const ConnectionTimeouts & timeouts, TaskTable & task_table)
{
/// An heuristic: if previous shard is already done, then check next one without sleeps due to max_workers constraint
bool previous_shard_is_instantly_finished = false;
@ -1360,7 +1363,7 @@ protected:
/// If not, did we check existence of that partition previously?
if (shard->checked_partitions.count(partition_name) == 0)
{
auto check_shard_has_partition = [&] () { return checkShardHasPartition(*shard, partition_name); };
auto check_shard_has_partition = [&] () { return checkShardHasPartition(timeouts, *shard, partition_name); };
bool has_partition = retry(check_shard_has_partition);
shard->checked_partitions.emplace(partition_name);
@ -1397,7 +1400,7 @@ protected:
bool was_error = false;
for (UInt64 try_num = 0; try_num < max_shard_partition_tries; ++try_num)
{
task_status = tryProcessPartitionTask(partition, is_unprioritized_task);
task_status = tryProcessPartitionTask(timeouts, partition, is_unprioritized_task);
/// Exit if success
if (task_status == PartitionTaskStatus::Finished)
@ -1483,13 +1486,13 @@ protected:
Error,
};
PartitionTaskStatus tryProcessPartitionTask(ShardPartition & task_partition, bool is_unprioritized_task)
PartitionTaskStatus tryProcessPartitionTask(const ConnectionTimeouts & timeouts, ShardPartition & task_partition, bool is_unprioritized_task)
{
PartitionTaskStatus res;
try
{
res = processPartitionTaskImpl(task_partition, is_unprioritized_task);
res = processPartitionTaskImpl(timeouts, task_partition, is_unprioritized_task);
}
catch (...)
{
@ -1510,7 +1513,7 @@ protected:
return res;
}
PartitionTaskStatus processPartitionTaskImpl(ShardPartition & task_partition, bool is_unprioritized_task)
PartitionTaskStatus processPartitionTaskImpl(const ConnectionTimeouts & timeouts, ShardPartition & task_partition, bool is_unprioritized_task)
{
TaskShard & task_shard = task_partition.task_shard;
TaskTable & task_table = task_shard.task_table;
@ -1611,7 +1614,7 @@ protected:
zookeeper->createAncestors(current_task_status_path);
/// We need to update table definitions for each partition, it could be changed after ALTER
createShardInternalTables(task_shard);
createShardInternalTables(timeouts, task_shard);
/// Check that destination partition is empty if we are first worker
/// NOTE: this check is incorrect if pull and push tables have different partition key!
@ -1828,23 +1831,25 @@ protected:
return typeid_cast<const ColumnString &>(*block.safeGetByPosition(0).column).getDataAt(0).toString();
}
ASTPtr getCreateTableForPullShard(TaskShard & task_shard)
ASTPtr getCreateTableForPullShard(const ConnectionTimeouts & timeouts, TaskShard & task_shard)
{
/// Fetch and parse (possibly) new definition
auto connection_entry = task_shard.info.pool->get(&task_cluster->settings_pull);
String create_query_pull_str = getRemoteCreateTable(task_shard.task_table.table_pull, *connection_entry,
&task_cluster->settings_pull);
auto connection_entry = task_shard.info.pool->get(timeouts, &task_cluster->settings_pull);
String create_query_pull_str = getRemoteCreateTable(
task_shard.task_table.table_pull,
*connection_entry,
&task_cluster->settings_pull);
ParserCreateQuery parser_create_query;
return parseQuery(parser_create_query, create_query_pull_str, 0);
}
void createShardInternalTables(TaskShard & task_shard, bool create_split = true)
void createShardInternalTables(const ConnectionTimeouts & timeouts, TaskShard & task_shard, bool create_split = true)
{
TaskTable & task_table = task_shard.task_table;
/// We need to update table definitions for each part, it could be changed after ALTER
task_shard.current_pull_table_create_query = getCreateTableForPullShard(task_shard);
task_shard.current_pull_table_create_query = getCreateTableForPullShard(timeouts, task_shard);
/// Create local Distributed tables:
/// a table fetching data from current shard and a table inserting data to the whole destination cluster
@ -1872,9 +1877,9 @@ protected:
}
std::set<String> getShardPartitions(TaskShard & task_shard)
std::set<String> getShardPartitions(const ConnectionTimeouts & timeouts, TaskShard & task_shard)
{
createShardInternalTables(task_shard, false);
createShardInternalTables(timeouts, task_shard, false);
TaskTable & task_table = task_shard.task_table;
@ -1914,9 +1919,9 @@ protected:
return res;
}
bool checkShardHasPartition(TaskShard & task_shard, const String & partition_quoted_name)
bool checkShardHasPartition(const ConnectionTimeouts & timeouts, TaskShard & task_shard, const String & partition_quoted_name)
{
createShardInternalTables(task_shard, false);
createShardInternalTables(timeouts, task_shard, false);
TaskTable & task_table = task_shard.task_table;
@ -1998,7 +2003,8 @@ protected:
Settings current_settings = settings ? *settings : task_cluster->settings_common;
current_settings.max_parallel_replicas = num_remote_replicas ? num_remote_replicas : 1;
auto connections = shard.pool->getMany(&current_settings, pool_mode);
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings).getSaturated(current_settings.max_execution_time);
auto connections = shard.pool->getMany(timeouts, &current_settings, pool_mode);
for (auto & connection : connections)
{
@ -2187,7 +2193,7 @@ void ClusterCopierApp::mainImpl()
copier->uploadTaskDescription(task_path, task_file, config().getBool("task-upload-force", false));
copier->init();
copier->process();
copier->process(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(context->getSettingsRef()));
}

View File

@ -3,6 +3,7 @@
#include <Core/Types.h>
#include <Common/CpuId.h>
#include <common/getMemoryAmount.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
@ -24,6 +25,7 @@ namespace
void waitQuery(Connection & connection)
{
bool finished = false;
while (true)
{
if (!connection.poll(1000000))
@ -50,12 +52,14 @@ namespace fs = boost::filesystem;
PerformanceTest::PerformanceTest(
const XMLConfigurationPtr & config_,
Connection & connection_,
const ConnectionTimeouts & timeouts_,
InterruptListener & interrupt_listener_,
const PerformanceTestInfo & test_info_,
Context & context_,
const std::vector<size_t> & queries_to_run_)
: config(config_)
, connection(connection_)
, timeouts(timeouts_)
, interrupt_listener(interrupt_listener_)
, test_info(test_info_)
, context(context_)
@ -108,7 +112,7 @@ bool PerformanceTest::checkPreconditions() const
size_t exist = 0;
connection.sendQuery(query, "", QueryProcessingStage::Complete, &test_info.settings, nullptr, false);
connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, &test_info.settings, nullptr, false);
while (true)
{
@ -188,7 +192,7 @@ void PerformanceTest::prepare() const
for (const auto & query : test_info.create_and_fill_queries)
{
LOG_INFO(log, "Executing create or fill query \"" << query << '\"');
connection.sendQuery(query, "", QueryProcessingStage::Complete, &test_info.settings, nullptr, false);
connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, &test_info.settings, nullptr, false);
waitQuery(connection);
LOG_INFO(log, "Query finished");
}
@ -200,7 +204,7 @@ void PerformanceTest::finish() const
for (const auto & query : test_info.drop_queries)
{
LOG_INFO(log, "Executing drop query \"" << query << '\"');
connection.sendQuery(query, "", QueryProcessingStage::Complete, &test_info.settings, nullptr, false);
connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, &test_info.settings, nullptr, false);
waitQuery(connection);
LOG_INFO(log, "Query finished");
}

View File

@ -5,6 +5,7 @@
#include <common/logger_useful.h>
#include <Poco/Util/XMLConfiguration.h>
#include <IO/ConnectionTimeouts.h>
#include "PerformanceTestInfo.h"
namespace DB
@ -20,6 +21,7 @@ public:
PerformanceTest(
const XMLConfigurationPtr & config_,
Connection & connection_,
const ConnectionTimeouts & timeouts_,
InterruptListener & interrupt_listener_,
const PerformanceTestInfo & test_info_,
Context & context_,
@ -45,6 +47,7 @@ private:
private:
XMLConfigurationPtr config;
Connection & connection;
const ConnectionTimeouts & timeouts;
InterruptListener & interrupt_listener;
PerformanceTestInfo test_info;

View File

@ -72,10 +72,11 @@ public:
Strings && tests_names_regexp_,
Strings && skip_names_regexp_,
const std::unordered_map<std::string, std::vector<size_t>> query_indexes_,
const ConnectionTimeouts & timeouts)
const ConnectionTimeouts & timeouts_)
: connection(host_, port_, default_database_, user_,
password_, timeouts, "performance-test", Protocol::Compression::Enable,
password_, "performance-test", Protocol::Compression::Enable,
secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable)
, timeouts(timeouts_)
, tests_tags(std::move(tests_tags_))
, tests_names(std::move(tests_names_))
, tests_names_regexp(std::move(tests_names_regexp_))
@ -100,7 +101,7 @@ public:
UInt64 version_minor;
UInt64 version_patch;
UInt64 version_revision;
connection.getServerVersion(name, version_major, version_minor, version_patch, version_revision);
connection.getServerVersion(timeouts, name, version_major, version_minor, version_patch, version_revision);
std::stringstream ss;
ss << version_major << "." << version_minor << "." << version_patch;
@ -115,6 +116,7 @@ public:
private:
Connection connection;
const ConnectionTimeouts & timeouts;
const Strings & tests_tags;
const Strings & tests_names;
@ -195,7 +197,7 @@ private:
{
PerformanceTestInfo info(test_config, profiles_file, global_context.getSettingsRef());
LOG_INFO(log, "Config for test '" << info.test_name << "' parsed");
PerformanceTest current(test_config, connection, interrupt_listener, info, global_context, query_indexes[info.path]);
PerformanceTest current(test_config, connection, timeouts, interrupt_listener, info, global_context, query_indexes[info.path]);
if (current.checkPreconditions())
{

View File

@ -48,7 +48,7 @@ namespace ErrorCodes
}
void Connection::connect()
void Connection::connect(const ConnectionTimeouts & timeouts)
{
try
{
@ -230,10 +230,15 @@ UInt16 Connection::getPort() const
return port;
}
void Connection::getServerVersion(String & name, UInt64 & version_major, UInt64 & version_minor, UInt64 & version_patch, UInt64 & revision)
void Connection::getServerVersion(const ConnectionTimeouts & timeouts,
String & name,
UInt64 & version_major,
UInt64 & version_minor,
UInt64 & version_patch,
UInt64 & revision)
{
if (!connected)
connect();
connect(timeouts);
name = server_name;
version_major = server_version_major;
@ -242,40 +247,40 @@ void Connection::getServerVersion(String & name, UInt64 & version_major, UInt64
revision = server_revision;
}
UInt64 Connection::getServerRevision()
UInt64 Connection::getServerRevision(const ConnectionTimeouts & timeouts)
{
if (!connected)
connect();
connect(timeouts);
return server_revision;
}
const String & Connection::getServerTimezone()
const String & Connection::getServerTimezone(const ConnectionTimeouts & timeouts)
{
if (!connected)
connect();
connect(timeouts);
return server_timezone;
}
const String & Connection::getServerDisplayName()
const String & Connection::getServerDisplayName(const ConnectionTimeouts & timeouts)
{
if (!connected)
connect();
connect(timeouts);
return server_display_name;
}
void Connection::forceConnected()
void Connection::forceConnected(const ConnectionTimeouts & timeouts)
{
if (!connected)
{
connect();
connect(timeouts);
}
else if (!ping())
{
LOG_TRACE(log_wrapper.get(), "Connection was closed, will reconnect.");
connect();
connect(timeouts);
}
}
@ -318,10 +323,11 @@ bool Connection::ping()
return true;
}
TablesStatusResponse Connection::getTablesStatus(const TablesStatusRequest & request)
TablesStatusResponse Connection::getTablesStatus(const ConnectionTimeouts & timeouts,
const TablesStatusRequest & request)
{
if (!connected)
connect();
connect(timeouts);
TimeoutSetter timeout_setter(*socket, sync_request_timeout, true);
@ -344,6 +350,7 @@ TablesStatusResponse Connection::getTablesStatus(const TablesStatusRequest & req
void Connection::sendQuery(
const ConnectionTimeouts & timeouts,
const String & query,
const String & query_id_,
UInt64 stage,
@ -352,7 +359,9 @@ void Connection::sendQuery(
bool with_pending_data)
{
if (!connected)
connect();
connect(timeouts);
TimeoutSetter timeout_setter(*socket, timeouts.send_timeout, timeouts.receive_timeout, true);
if (settings)
{

View File

@ -57,7 +57,6 @@ public:
Connection(const String & host_, UInt16 port_,
const String & default_database_,
const String & user_, const String & password_,
const ConnectionTimeouts & timeouts_,
const String & client_name_ = "client",
Protocol::Compression compression_ = Protocol::Compression::Enable,
Protocol::Secure secure_ = Protocol::Secure::Disable,
@ -68,7 +67,6 @@ public:
client_name(client_name_),
compression(compression_),
secure(secure_),
timeouts(timeouts_),
sync_request_timeout(sync_request_timeout_),
log_wrapper(*this)
{
@ -106,11 +104,16 @@ public:
/// Change default database. Changes will take effect on next reconnect.
void setDefaultDatabase(const String & database);
void getServerVersion(String & name, UInt64 & version_major, UInt64 & version_minor, UInt64 & version_patch, UInt64 & revision);
UInt64 getServerRevision();
void getServerVersion(const ConnectionTimeouts & timeouts,
String & name,
UInt64 & version_major,
UInt64 & version_minor,
UInt64 & version_patch,
UInt64 & revision);
UInt64 getServerRevision(const ConnectionTimeouts & timeouts);
const String & getServerTimezone();
const String & getServerDisplayName();
const String & getServerTimezone(const ConnectionTimeouts & timeouts);
const String & getServerDisplayName(const ConnectionTimeouts & timeouts);
/// For log and exception messages.
const String & getDescription() const;
@ -118,14 +121,9 @@ public:
UInt16 getPort() const;
const String & getDefaultDatabase() const;
/// For proper polling.
inline const auto & getTimeouts() const
{
return timeouts;
}
/// If last flag is true, you need to call sendExternalTablesData after.
void sendQuery(
const ConnectionTimeouts & timeouts,
const String & query,
const String & query_id_ = "",
UInt64 stage = QueryProcessingStage::Complete,
@ -156,9 +154,10 @@ public:
Packet receivePacket();
/// If not connected yet, or if connection is broken - then connect. If cannot connect - throw an exception.
void forceConnected();
void forceConnected(const ConnectionTimeouts & timeouts);
TablesStatusResponse getTablesStatus(const TablesStatusRequest & request);
TablesStatusResponse getTablesStatus(const ConnectionTimeouts & timeouts,
const TablesStatusRequest & request);
/** Disconnect.
* This may be used, if connection is left in unsynchronised state
@ -216,7 +215,6 @@ private:
*/
ThrottlerPtr throttler;
ConnectionTimeouts timeouts;
Poco::Timespan sync_request_timeout;
/// From where to read query execution result.
@ -252,7 +250,7 @@ private:
LoggerWrapper log_wrapper;
void connect();
void connect(const ConnectionTimeouts & timeouts);
void sendHello();
void receiveHello();
bool ping();

View File

@ -30,7 +30,9 @@ public:
/// Selects the connection to work.
/// If force_connected is false, the client must manually ensure that returned connection is good.
virtual Entry get(const Settings * settings = nullptr, bool force_connected = true) = 0;
virtual Entry get(const ConnectionTimeouts & timeouts,
const Settings * settings = nullptr,
bool force_connected = true) = 0;
};
using ConnectionPoolPtr = std::shared_ptr<IConnectionPool>;
@ -50,7 +52,6 @@ public:
const String & default_database_,
const String & user_,
const String & password_,
const ConnectionTimeouts & timeouts,
const String & client_name_ = "client",
Protocol::Compression compression_ = Protocol::Compression::Enable,
Protocol::Secure secure_ = Protocol::Secure::Disable)
@ -63,12 +64,13 @@ public:
password(password_),
client_name(client_name_),
compression(compression_),
secure{secure_},
timeouts(timeouts)
secure{secure_}
{
}
Entry get(const Settings * settings = nullptr, bool force_connected = true) override
Entry get(const ConnectionTimeouts & timeouts,
const Settings * settings = nullptr,
bool force_connected = true) override
{
Entry entry;
if (settings)
@ -77,7 +79,7 @@ public:
entry = Base::get(-1);
if (force_connected)
entry->forceConnected();
entry->forceConnected(timeouts);
return entry;
}
@ -93,7 +95,7 @@ protected:
{
return std::make_shared<Connection>(
host, port,
default_database, user, password, timeouts,
default_database, user, password,
client_name, compression, secure);
}
@ -108,7 +110,6 @@ private:
Protocol::Compression compression; /// Whether to compress data when interacting with the server.
Protocol::Secure secure; /// Whether to encrypt data when interacting with the server.
ConnectionTimeouts timeouts;
};
}

View File

@ -8,6 +8,8 @@
#include <Common/ProfileEvents.h>
#include <Core/Settings.h>
#include <IO/ConnectionTimeouts.h>
namespace ProfileEvents
{
@ -29,9 +31,8 @@ namespace ErrorCodes
ConnectionPoolWithFailover::ConnectionPoolWithFailover(
ConnectionPoolPtrs nested_pools_,
LoadBalancing load_balancing,
size_t max_tries_,
time_t decrease_error_period_)
: Base(std::move(nested_pools_), max_tries_, decrease_error_period_, &Logger::get("ConnectionPoolWithFailover"))
: Base(std::move(nested_pools_), decrease_error_period_, &Logger::get("ConnectionPoolWithFailover"))
, default_load_balancing(load_balancing)
{
const std::string & local_hostname = getFQDNOrHostName();
@ -44,11 +45,13 @@ ConnectionPoolWithFailover::ConnectionPoolWithFailover(
}
}
IConnectionPool::Entry ConnectionPoolWithFailover::get(const Settings * settings, bool /*force_connected*/)
IConnectionPool::Entry ConnectionPoolWithFailover::get(const ConnectionTimeouts & timeouts,
const Settings * settings,
bool /*force_connected*/)
{
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
{
return tryGetEntry(pool, fail_message, settings);
return tryGetEntry(pool, timeouts, fail_message, settings);
};
GetPriorityFunc get_priority;
@ -70,11 +73,13 @@ IConnectionPool::Entry ConnectionPoolWithFailover::get(const Settings * settings
return Base::get(try_get_entry, get_priority);
}
std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany(const Settings * settings, PoolMode pool_mode)
std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany(const ConnectionTimeouts & timeouts,
const Settings * settings,
PoolMode pool_mode)
{
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
{
return tryGetEntry(pool, fail_message, settings);
return tryGetEntry(pool, timeouts, fail_message, settings);
};
std::vector<TryResult> results = getManyImpl(settings, pool_mode, try_get_entry);
@ -86,22 +91,27 @@ std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany(const Se
return entries;
}
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyForTableFunction(const Settings * settings, PoolMode pool_mode)
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyForTableFunction(
const ConnectionTimeouts & timeouts,
const Settings * settings,
PoolMode pool_mode)
{
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
{
return tryGetEntry(pool, fail_message, settings);
return tryGetEntry(pool, timeouts, fail_message, settings);
};
return getManyImpl(settings, pool_mode, try_get_entry);
}
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyChecked(
const Settings * settings, PoolMode pool_mode, const QualifiedTableName & table_to_check)
const ConnectionTimeouts & timeouts,
const Settings * settings, PoolMode pool_mode,
const QualifiedTableName & table_to_check)
{
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
{
return tryGetEntry(pool, fail_message, settings, &table_to_check);
return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check);
};
return getManyImpl(settings, pool_mode, try_get_entry);
@ -113,6 +123,9 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
const TryGetEntryFunc & try_get_entry)
{
size_t min_entries = (settings && settings->skip_unavailable_shards) ? 0 : 1;
size_t max_tries = (settings ?
size_t{settings->connections_with_failover_max_tries} :
size_t{DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES});
size_t max_entries;
if (pool_mode == PoolMode::GET_ALL)
{
@ -144,12 +157,13 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
bool fallback_to_stale_replicas = settings ? bool(settings->fallback_to_stale_replicas_for_distributed_queries) : true;
return Base::getMany(min_entries, max_entries, try_get_entry, get_priority, fallback_to_stale_replicas);
return Base::getMany(min_entries, max_entries, max_tries, try_get_entry, get_priority, fallback_to_stale_replicas);
}
ConnectionPoolWithFailover::TryResult
ConnectionPoolWithFailover::tryGetEntry(
IConnectionPool & pool,
const ConnectionTimeouts & timeouts,
std::string & fail_message,
const Settings * settings,
const QualifiedTableName * table_to_check)
@ -157,15 +171,15 @@ ConnectionPoolWithFailover::tryGetEntry(
TryResult result;
try
{
result.entry = pool.get(settings, /* force_connected = */ false);
result.entry = pool.get(timeouts, settings, /* force_connected = */ false);
UInt64 server_revision = 0;
if (table_to_check)
server_revision = result.entry->getServerRevision();
server_revision = result.entry->getServerRevision(timeouts);
if (!table_to_check || server_revision < DBMS_MIN_REVISION_WITH_TABLES_STATUS)
{
result.entry->forceConnected();
result.entry->forceConnected(timeouts);
result.is_usable = true;
result.is_up_to_date = true;
return result;
@ -176,7 +190,7 @@ ConnectionPoolWithFailover::tryGetEntry(
TablesStatusRequest status_request;
status_request.tables.emplace(*table_to_check);
TablesStatusResponse status_response = result.entry->getTablesStatus(status_request);
TablesStatusResponse status_response = result.entry->getTablesStatus(timeouts, status_request);
auto table_status_it = status_response.table_states_by_id.find(*table_to_check);
if (table_status_it == status_response.table_states_by_id.end())
{

View File

@ -34,21 +34,24 @@ public:
ConnectionPoolWithFailover(
ConnectionPoolPtrs nested_pools_,
LoadBalancing load_balancing,
size_t max_tries_ = DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES,
time_t decrease_error_period_ = DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD);
using Entry = IConnectionPool::Entry;
/** Allocates connection to work. */
Entry get(const Settings * settings = nullptr, bool force_connected = true) override; /// From IConnectionPool
Entry get(const ConnectionTimeouts & timeouts,
const Settings * settings = nullptr,
bool force_connected = true) override; /// From IConnectionPool
/** Allocates up to the specified number of connections to work.
* Connections provide access to different replicas of one shard.
*/
std::vector<Entry> getMany(const Settings * settings, PoolMode pool_mode);
std::vector<Entry> getMany(const ConnectionTimeouts & timeouts,
const Settings * settings, PoolMode pool_mode);
/// The same as getMany(), but return std::vector<TryResult>.
std::vector<TryResult> getManyForTableFunction(const Settings * settings, PoolMode pool_mode);
std::vector<TryResult> getManyForTableFunction(const ConnectionTimeouts & timeouts,
const Settings * settings, PoolMode pool_mode);
using Base = PoolWithFailoverBase<IConnectionPool>;
using TryResult = Base::TryResult;
@ -56,7 +59,10 @@ public:
/// The same as getMany(), but check that replication delay for table_to_check is acceptable.
/// Delay threshold is taken from settings.
std::vector<TryResult> getManyChecked(
const Settings * settings, PoolMode pool_mode, const QualifiedTableName & table_to_check);
const ConnectionTimeouts & timeouts,
const Settings * settings,
PoolMode pool_mode,
const QualifiedTableName & table_to_check);
private:
/// Get the values of relevant settings and call Base::getMany()
@ -70,6 +76,7 @@ private:
/// for this table is not too large.
TryResult tryGetEntry(
IConnectionPool & pool,
const ConnectionTimeouts & timeouts,
std::string & fail_message,
const Settings * settings,
const QualifiedTableName * table_to_check = nullptr);

View File

@ -1,4 +1,5 @@
#include <Client/MultiplexedConnections.h>
#include <IO/ConnectionTimeouts.h>
namespace DB
{
@ -73,6 +74,7 @@ void MultiplexedConnections::sendExternalTablesData(std::vector<ExternalTablesDa
}
void MultiplexedConnections::sendQuery(
const ConnectionTimeouts & timeouts,
const String & query,
const String & query_id,
UInt64 stage,
@ -91,7 +93,7 @@ void MultiplexedConnections::sendQuery(
if (!replica.connection)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
if (replica.connection->getServerRevision() < DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD)
if (replica.connection->getServerRevision(timeouts) < DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD)
{
/// Disable two-level aggregation due to version incompatibility.
modified_settings.group_by_two_level_threshold = 0;
@ -107,13 +109,15 @@ void MultiplexedConnections::sendQuery(
for (size_t i = 0; i < num_replicas; ++i)
{
modified_settings.parallel_replica_offset = i;
replica_states[i].connection->sendQuery(query, query_id, stage, &modified_settings, client_info, with_pending_data);
replica_states[i].connection->sendQuery(timeouts, query, query_id,
stage, &modified_settings, client_info, with_pending_data);
}
}
else
{
/// Use single replica.
replica_states[0].connection->sendQuery(query, query_id, stage, &modified_settings, client_info, with_pending_data);
replica_states[0].connection->sendQuery(timeouts, query, query_id, stage,
&modified_settings, client_info, with_pending_data);
}
sent_query = true;

View File

@ -1,9 +1,10 @@
#pragma once
#include <mutex>
#include <Common/Throttler.h>
#include <Client/Connection.h>
#include <Client/ConnectionPoolWithFailover.h>
#include <mutex>
#include <IO/ConnectionTimeouts.h>
namespace DB
{
@ -31,6 +32,7 @@ public:
/// Send request to replicas.
void sendQuery(
const ConnectionTimeouts & timeouts,
const String & query,
const String & query_id = "",
UInt64 stage = QueryProcessingStage::Complete,

View File

@ -55,11 +55,9 @@ public:
PoolWithFailoverBase(
NestedPools nested_pools_,
size_t max_tries_,
time_t decrease_error_period_,
Logger * log_)
: nested_pools(std::move(nested_pools_))
, max_tries(max_tries_)
, decrease_error_period(decrease_error_period_)
, shared_pool_states(nested_pools.size())
, log(log_)
@ -108,7 +106,7 @@ public:
/// The method will throw if it is unable to get min_entries alive connections or
/// if fallback_to_stale_replicas is false and it is unable to get min_entries connections to up-to-date replicas.
std::vector<TryResult> getMany(
size_t min_entries, size_t max_entries,
size_t min_entries, size_t max_entries, size_t max_tries,
const TryGetEntryFunc & try_get_entry,
const GetPriorityFunc & get_priority = GetPriorityFunc(),
bool fallback_to_stale_replicas = true);
@ -125,8 +123,6 @@ protected:
NestedPools nested_pools;
const size_t max_tries;
const time_t decrease_error_period;
std::mutex pool_states_mutex;
@ -141,7 +137,7 @@ template <typename TNestedPool>
typename TNestedPool::Entry
PoolWithFailoverBase<TNestedPool>::get(const TryGetEntryFunc & try_get_entry, const GetPriorityFunc & get_priority)
{
std::vector<TryResult> results = getMany(1, 1, try_get_entry, get_priority);
std::vector<TryResult> results = getMany(1, 1, 1, try_get_entry, get_priority);
if (results.empty() || results[0].entry.isNull())
throw DB::Exception(
"PoolWithFailoverBase::getMany() returned less than min_entries entries.",
@ -152,7 +148,7 @@ PoolWithFailoverBase<TNestedPool>::get(const TryGetEntryFunc & try_get_entry, co
template <typename TNestedPool>
std::vector<typename PoolWithFailoverBase<TNestedPool>::TryResult>
PoolWithFailoverBase<TNestedPool>::getMany(
size_t min_entries, size_t max_entries,
size_t min_entries, size_t max_entries, size_t max_tries,
const TryGetEntryFunc & try_get_entry,
const GetPriorityFunc & get_priority,
bool fallback_to_stale_replicas)
@ -192,7 +188,7 @@ PoolWithFailoverBase<TNestedPool>::getMany(
size_t up_to_date_count = 0;
size_t failed_pools_count = 0;
/// At exit update shared error counts with error counts occured during this call.
/// At exit update shared error counts with error counts occurred during this call.
SCOPE_EXIT(
{
std::lock_guard lock(pool_states_mutex);

View File

@ -7,6 +7,8 @@
#include <Interpreters/InternalTextLogsQueue.h>
#include <Storages/IStorage.h>
#include <IO/ConnectionTimeouts.h>
namespace DB
{
@ -61,17 +63,17 @@ RemoteBlockInputStream::RemoteBlockInputStream(
create_multiplexed_connections = [this, pool, throttler]()
{
const Settings & current_settings = context.getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
std::vector<IConnectionPool::Entry> connections;
if (main_table)
{
auto try_results = pool->getManyChecked(&current_settings, pool_mode, *main_table);
auto try_results = pool->getManyChecked(timeouts, &current_settings, pool_mode, *main_table);
connections.reserve(try_results.size());
for (auto & try_result : try_results)
connections.emplace_back(std::move(try_result.entry));
}
else
connections = pool->getMany(&current_settings, pool_mode);
connections = pool->getMany(timeouts, &current_settings, pool_mode);
return std::make_unique<MultiplexedConnections>(
std::move(connections), current_settings, throttler);
@ -283,12 +285,14 @@ void RemoteBlockInputStream::sendQuery()
{
multiplexed_connections = create_multiplexed_connections();
if (context.getSettingsRef().skip_unavailable_shards && 0 == multiplexed_connections->size())
const auto& settings = context.getSettingsRef();
if (settings.skip_unavailable_shards && 0 == multiplexed_connections->size())
return;
established = true;
multiplexed_connections->sendQuery(query, "", stage, &context.getClientInfo(), true);
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
multiplexed_connections->sendQuery(timeouts, query, "", stage, &context.getClientInfo(), true);
established = false;
sent_query = true;

View File

@ -96,6 +96,7 @@ private:
const String query;
Context context;
/// Temporary tables needed to be sent to remote servers
Tables external_tables;
QueryProcessingStage::Enum stage;
@ -118,7 +119,7 @@ private:
*/
std::atomic<bool> finished { false };
/** Cancel query request was sent to all replicas beacuse data is not needed anymore
/** Cancel query request was sent to all replicas because data is not needed anymore
* This behaviour may occur when:
* - data size is already satisfactory (when using LIMIT, for example)
* - an exception was thrown from client side

View File

@ -6,6 +6,7 @@
#include <Common/NetException.h>
#include <Common/CurrentThread.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <IO/ConnectionTimeouts.h>
namespace DB
@ -18,13 +19,16 @@ namespace ErrorCodes
}
RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_, const String & query_, const Settings * settings_)
RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_,
const ConnectionTimeouts & timeouts,
const String & query_,
const Settings * settings_)
: connection(connection_), query(query_), settings(settings_)
{
/** Send query and receive "header", that describe table structure.
* Header is needed to know, what structure is required for blocks to be passed to 'write' method.
*/
connection.sendQuery(query, "", QueryProcessingStage::Complete, settings, nullptr);
connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, settings, nullptr);
while (true)
{

View File

@ -3,6 +3,7 @@
#include <Core/Block.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Common/Throttler.h>
#include <IO/ConnectionTimeouts.h>
namespace DB
@ -18,7 +19,10 @@ struct Settings;
class RemoteBlockOutputStream : public IBlockOutputStream
{
public:
RemoteBlockOutputStream(Connection & connection_, const String & query_, const Settings * settings_ = nullptr);
RemoteBlockOutputStream(Connection & connection_,
const ConnectionTimeouts & timeouts,
const String & query_,
const Settings * settings_ = nullptr);
Block getHeader() const override { return header; }

View File

@ -43,10 +43,13 @@ private:
public:
using ExceptionCallback = std::function<void()>;
UnionBlockInputStream(BlockInputStreams inputs, BlockInputStreamPtr additional_input_at_end, size_t max_threads,
ExceptionCallback exception_callback_ = ExceptionCallback()) :
output_queue(std::min(inputs.size(), max_threads)),
handler(*this),
UnionBlockInputStream(
BlockInputStreams inputs,
BlockInputStreamPtr additional_input_at_end,
size_t max_threads,
ExceptionCallback exception_callback_ = ExceptionCallback()
) :
output_queue(std::min(inputs.size(), max_threads)), handler(*this),
processor(inputs, additional_input_at_end, max_threads, handler),
exception_callback(exception_callback_)
{

View File

@ -30,10 +30,8 @@ static ConnectionPoolWithFailoverPtr createPool(
bool secure,
const std::string & db,
const std::string & user,
const std::string & password,
const Context & context)
const std::string & password)
{
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(context.getSettingsRef());
ConnectionPoolPtrs pools;
pools.emplace_back(std::make_shared<ConnectionPool>(
MAX_CONNECTIONS,
@ -42,7 +40,6 @@ static ConnectionPoolWithFailoverPtr createPool(
db,
user,
password,
timeouts,
"ClickHouseDictionarySource",
Protocol::Compression::Enable,
secure ? Protocol::Secure::Enable : Protocol::Secure::Disable));
@ -72,7 +69,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(
, sample_block{sample_block}
, context(context_)
, is_local{isLocalAddress({host, port}, context.getTCPPort())}
, pool{is_local ? nullptr : createPool(host, port, secure, db, user, password, context)}
, pool{is_local ? nullptr : createPool(host, port, secure, db, user, password)}
, load_all_query{query_builder.composeLoadAllQuery()}
{
/// We should set user info even for the case when the dictionary is loaded in-process (without TCP communication).
@ -98,7 +95,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(const ClickHouseDictionar
, sample_block{other.sample_block}
, context(other.context)
, is_local{other.is_local}
, pool{is_local ? nullptr : createPool(host, port, secure, db, user, password, context)}
, pool{is_local ? nullptr : createPool(host, port, secure, db, user, password)}
, load_all_query{other.load_all_query}
{
}
@ -179,6 +176,7 @@ BlockInputStreamPtr ClickHouseDictionarySource::createStreamForSelectiveLoad(con
{
if (is_local)
return executeQuery(query, context, true).in;
return std::make_shared<RemoteBlockInputStream>(pool, query, sample_block, context);
}

View File

@ -250,11 +250,10 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Setting
settings.distributed_connections_pool_size,
address.host_name, address.port,
address.default_database, address.user, address.password,
ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings).getSaturated(settings.max_execution_time),
"server", address.compression, address.secure);
info.pool = std::make_shared<ConnectionPoolWithFailover>(
ConnectionPoolPtrs{pool}, settings.load_balancing, settings.connections_with_failover_max_tries);
ConnectionPoolPtrs{pool}, settings.load_balancing);
info.per_replica_pools = {std::move(pool)};
if (weight)
@ -322,7 +321,6 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Setting
settings.distributed_connections_pool_size,
replica.host_name, replica.port,
replica.default_database, replica.user, replica.password,
ConnectionTimeouts::getTCPTimeoutsWithFailover(settings).getSaturated(settings.max_execution_time),
"server", replica.compression, replica.secure);
all_replicas_pools.emplace_back(replica_pool);
@ -331,7 +329,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Setting
}
ConnectionPoolWithFailoverPtr shard_pool = std::make_shared<ConnectionPoolWithFailover>(
all_replicas_pools, settings.load_balancing, settings.connections_with_failover_max_tries);
all_replicas_pools, settings.load_balancing);
if (weight)
slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size());
@ -375,7 +373,6 @@ Cluster::Cluster(const Settings & settings, const std::vector<std::vector<String
settings.distributed_connections_pool_size,
replica.host_name, replica.port,
replica.default_database, replica.user, replica.password,
ConnectionTimeouts::getTCPTimeoutsWithFailover(settings).getSaturated(settings.max_execution_time),
"server", replica.compression, replica.secure);
all_replicas.emplace_back(replica_pool);
if (replica.is_local && !treat_local_as_remote)
@ -383,7 +380,7 @@ Cluster::Cluster(const Settings & settings, const std::vector<std::vector<String
}
ConnectionPoolWithFailoverPtr shard_pool = std::make_shared<ConnectionPoolWithFailover>(
all_replicas, settings.load_balancing, settings.connections_with_failover_max_tries);
all_replicas, settings.load_balancing);
slot_to_shard.insert(std::end(slot_to_shard), default_weight, shards_info.size());
shards_info.push_back({{}, current_shard_num, default_weight, std::move(shard_local_addresses), std::move(shard_pool),

View File

@ -184,13 +184,17 @@ void SelectStreamFactory::createForShard(
local_delay]()
-> BlockInputStreamPtr
{
auto current_settings = context.getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(
current_settings).getSaturated(
current_settings.max_execution_time);
std::vector<ConnectionPoolWithFailover::TryResult> try_results;
try
{
if (table_func_ptr)
try_results = pool->getManyForTableFunction(&context.getSettingsRef(), PoolMode::GET_MANY);
try_results = pool->getManyForTableFunction(timeouts, &current_settings, PoolMode::GET_MANY);
else
try_results = pool->getManyChecked(&context.getSettingsRef(), PoolMode::GET_MANY, main_table);
try_results = pool->getManyChecked(timeouts, &current_settings, PoolMode::GET_MANY, main_table);
}
catch (const Exception & ex)
{

View File

@ -11,6 +11,7 @@
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedReadBuffer.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/Operators.h>
#include <boost/algorithm/string/find_iterator.hpp>
@ -142,8 +143,7 @@ void StorageDistributedDirectoryMonitor::run()
ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::string & name, const StorageDistributed & storage)
{
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.global_context.getSettingsRef());
const auto pool_factory = [&storage, &timeouts] (const Cluster::Address & address) -> ConnectionPoolPtr
const auto pool_factory = [&storage] (const Cluster::Address & address) -> ConnectionPoolPtr
{
const auto & cluster = storage.getCluster();
const auto & shards_info = cluster->getShardsInfo();
@ -164,7 +164,7 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri
}
return std::make_shared<ConnectionPool>(
1, address.host_name, address.port, address.default_database, address.user, address.password, timeouts,
1, address.host_name, address.port, address.default_database, address.user, address.password,
storage.getName() + '_' + address.user, Protocol::Compression::Enable, address.secure);
};
@ -212,7 +212,8 @@ bool StorageDistributedDirectoryMonitor::findFiles()
void StorageDistributedDirectoryMonitor::processFile(const std::string & file_path)
{
LOG_TRACE(log, "Started processing `" << file_path << '`');
auto connection = pool->get();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.global_context.getSettingsRef());
auto connection = pool->get(timeouts);
try
{
@ -224,7 +225,7 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
std::string insert_query;
readQueryAndSettings(in, insert_settings, insert_query);
RemoteBlockOutputStream remote{*connection, insert_query, &insert_settings};
RemoteBlockOutputStream remote{*connection, timeouts, insert_query, &insert_settings};
remote.writePrefix();
remote.writePrepared(in);
@ -334,8 +335,8 @@ struct StorageDistributedDirectoryMonitor::Batch
WriteBufferFromFile out{parent.current_batch_file_path};
writeText(out);
}
auto connection = parent.pool->get();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(parent.storage.global_context.getSettingsRef());
auto connection = parent.pool->get(timeouts);
bool batch_broken = false;
try
@ -361,7 +362,7 @@ struct StorageDistributedDirectoryMonitor::Batch
if (first)
{
first = false;
remote = std::make_unique<RemoteBlockOutputStream>(*connection, insert_query, &insert_settings);
remote = std::make_unique<RemoteBlockOutputStream>(*connection, timeouts, insert_query, &insert_settings);
remote->writePrefix();
}

View File

@ -242,6 +242,8 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp
{
if (!job.stream)
{
const Settings & settings = context.getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
if (shard_info.hasInternalReplication())
{
/// Skip replica_index in case of internal replication
@ -249,7 +251,7 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp
throw Exception("There are several writing job for an automatically replicated shard", ErrorCodes::LOGICAL_ERROR);
/// TODO: it make sense to rewrite skip_unavailable_shards and max_parallel_replicas here
auto connections = shard_info.pool->getMany(&context.getSettingsRef(), PoolMode::GET_ONE);
auto connections = shard_info.pool->getMany(timeouts, &settings, PoolMode::GET_ONE);
if (connections.empty() || connections.front().isNull())
throw Exception("Expected exactly one connection for shard " + toString(job.shard_index), ErrorCodes::LOGICAL_ERROR);
@ -263,7 +265,7 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp
if (!connection_pool)
throw Exception("Connection pool for replica " + replica.readableString() + " does not exist", ErrorCodes::LOGICAL_ERROR);
job.connection_entry = connection_pool->get(&context.getSettingsRef());
job.connection_entry = connection_pool->get(timeouts, &settings);
if (job.connection_entry.isNull())
throw Exception("Got empty connection for replica" + replica.readableString(), ErrorCodes::LOGICAL_ERROR);
}
@ -271,7 +273,7 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp
if (throttler)
job.connection_entry->setThrottler(throttler);
job.stream = std::make_shared<RemoteBlockOutputStream>(*job.connection_entry, query_string, &context.getSettingsRef());
job.stream = std::make_shared<RemoteBlockOutputStream>(*job.connection_entry, timeouts, query_string, &settings);
job.stream->writePrefix();
}

View File

@ -3986,8 +3986,6 @@ void StorageReplicatedMergeTree::sendRequestToLeaderReplica(const ASTPtr & query
else
throw Exception("Can't proxy this query. Unsupported query type", ErrorCodes::NOT_IMPLEMENTED);
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithoutFailover(global_context.getSettingsRef());
const auto & query_settings = query_context.getSettingsRef();
const auto & query_client_info = query_context.getClientInfo();
String user = query_client_info.current_user;
@ -4003,7 +4001,7 @@ void StorageReplicatedMergeTree::sendRequestToLeaderReplica(const ASTPtr & query
leader_address.host,
leader_address.queries_port,
leader_address.database,
user, password, timeouts, "Follower replica");
user, password, "Follower replica");
std::stringstream new_query_ss;
formatAST(*new_query, new_query_ss, false, true);

View File

@ -0,0 +1,18 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,35 @@
<yandex>
<profiles>
<default>
<connections_with_failover_max_tries>3</connections_with_failover_max_tries>
<connect_timeout_with_failover_ms>1000</connect_timeout_with_failover_ms>
<min_insert_block_size_rows>1</min_insert_block_size_rows>
</default>
<delays>
<connections_with_failover_max_tries>5</connections_with_failover_max_tries>
<connect_timeout_with_failover_ms>3000</connect_timeout_with_failover_ms>
<min_insert_block_size_rows>1</min_insert_block_size_rows>
</delays>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
<ready_to_wait>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>delays</profile>
<quota>default</quota>
</ready_to_wait>
</users>
<quotas><default></default></quotas>
</yandex>

View File

@ -0,0 +1,141 @@
import itertools
import timeit
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
NODES = {'node' + str(i): cluster.add_instance(
'node' + str(i),
main_configs=['configs/remote_servers.xml'],
user_configs=['configs/set_distributed_defaults.xml'],
) for i in (1, 2)}
CREATE_TABLES_SQL = '''
CREATE DATABASE test;
CREATE TABLE base_table(
node String
)
ENGINE = MergeTree
PARTITION BY node
ORDER BY node;
CREATE TABLE distributed_table
ENGINE = Distributed(test_cluster, default, base_table) AS base_table;
'''
INSERT_SQL_TEMPLATE = "INSERT INTO base_table VALUES ('{node_id}')"
SELECTS_SQL = {
'distributed': 'SELECT node FROM distributed_table ORDER BY node',
'remote': ("SELECT node FROM remote('node1,node2', default.base_table) "
"ORDER BY node"),
}
EXCEPTION_NETWORK = 'e.displayText() = DB::NetException: '
EXCEPTION_TIMEOUT = 'Timeout exceeded while reading from socket ('
EXCEPTION_CONNECT = 'Timeout: connect timed out: '
TIMEOUT_MEASUREMENT_EPS = 0.01
EXPECTED_BEHAVIOR = {
'default': {
'times': 3,
'timeout': 1,
},
'ready_to_wait': {
'times': 5,
'timeout': 3,
},
}
def _check_exception(exception, expected_tries=3):
lines = exception.split('\n')
assert len(lines) > 4, "Unexpected exception (expected: timeout info)"
assert lines[0].startswith('Received exception from server (version')
assert lines[1].startswith('Code: 279')
assert lines[1].endswith('All connection tries failed. Log: ')
assert lines[2] == '', "Unexpected exception text (expected: empty line)"
for i, line in enumerate(lines[3:3 + expected_tries]):
expected_lines = (
'Code: 209, ' + EXCEPTION_NETWORK + EXCEPTION_TIMEOUT,
'Code: 209, ' + EXCEPTION_NETWORK + EXCEPTION_CONNECT,
)
assert any(line.startswith(expected) for expected in expected_lines), \
'Unexpected exception at one of the connection attempts'
assert lines[3 + expected_tries] == '', 'Wrong number of connect attempts'
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
for node_id, node in NODES.items():
node.query(CREATE_TABLES_SQL)
node.query(INSERT_SQL_TEMPLATE.format(node_id=node_id))
yield cluster
finally:
cluster.shutdown()
def _check_timeout_and_exception(node, user, query_base):
repeats = EXPECTED_BEHAVIOR[user]['times']
expected_timeout = EXPECTED_BEHAVIOR[user]['timeout'] * repeats
start = timeit.default_timer()
exception = node.query_and_get_error(SELECTS_SQL[query_base], user=user)
# And it should timeout no faster than:
measured_timeout = timeit.default_timer() - start
assert measured_timeout >= expected_timeout - TIMEOUT_MEASUREMENT_EPS
# And exception should reflect connection attempts:
_check_exception(exception, repeats)
@pytest.mark.parametrize(
('first_user', 'node_name', 'query_base'),
tuple(itertools.product(EXPECTED_BEHAVIOR, NODES, SELECTS_SQL)),
)
def test_reconnect(started_cluster, node_name, first_user, query_base):
node = NODES[node_name]
# Everything is up, select should work:
assert TSV(node.query(SELECTS_SQL[query_base],
user=first_user)) == TSV('node1\nnode2')
with PartitionManager() as pm:
# Break the connection.
pm.partition_instances(*NODES.values())
# Now it shouldn't:
_check_timeout_and_exception(node, first_user, query_base)
# Other user should have different timeout and exception
_check_timeout_and_exception(
node,
'default' if first_user != 'default' else 'ready_to_wait',
query_base,
)
# select should work again:
assert TSV(node.query(SELECTS_SQL[query_base],
user=first_user)) == TSV('node1\nnode2')