Merge remote-tracking branch 'main/master' into crc32-for-master

This commit is contained in:
Ivan Remen 2019-06-18 15:29:50 +03:00
commit 3fc8c8726f
62 changed files with 1660 additions and 352 deletions

View File

@ -13,5 +13,6 @@ ClickHouse is an open-source column-oriented database management system that all
## Upcoming Events
* [ClickHouse on HighLoad++ Siberia](https://www.highload.ru/siberia/2019/abstracts/5348) on June 24-25.
* [ClickHouse Community Meetup in Shenzhen](https://www.huodongxing.com/event/3483759917300) on October 20.
* [ClickHouse Community Meetup in Shanghai](https://www.huodongxing.com/event/4483760336000) on October 27.
* [ClickHouse Meetup in Novosibirsk](https://events.yandex.ru/events/ClickHouse/26-June-2019/) on June 26.
* [ClickHouse Meetup in Shenzhen](https://www.huodongxing.com/event/3483759917300) on October 20.
* [ClickHouse Meetup in Shanghai](https://www.huodongxing.com/event/4483760336000) on October 27.

View File

@ -54,10 +54,10 @@ public:
const String & host_, UInt16 port_, bool secure_, const String & default_database_,
const String & user_, const String & password_, const String & stage,
bool randomize_, size_t max_iterations_, double max_time_,
const String & json_path_, const ConnectionTimeouts & timeouts, const Settings & settings_)
const String & json_path_, const Settings & settings_)
:
concurrency(concurrency_), delay(delay_), queue(concurrency),
connections(concurrency, host_, port_, default_database_, user_, password_, timeouts, "benchmark", Protocol::Compression::Enable, secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable),
connections(concurrency, host_, port_, default_database_, user_, password_, "benchmark", Protocol::Compression::Enable, secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable),
randomize(randomize_), max_iterations(max_iterations_), max_time(max_time_),
json_path(json_path_), settings(settings_), global_context(Context::createGlobal()), pool(concurrency)
{
@ -240,7 +240,8 @@ private:
std::uniform_int_distribution<size_t> distribution(0, queries.size() - 1);
for (size_t i = 0; i < concurrency; ++i)
pool.schedule(std::bind(&Benchmark::thread, this, connections.get()));
pool.schedule(std::bind(&Benchmark::thread, this,
connections.get(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings))));
InterruptListener interrupt_listener;
info_per_interval.watch.restart();
@ -310,7 +311,9 @@ private:
void execute(ConnectionPool::Entry & connection, Query & query)
{
Stopwatch watch;
RemoteBlockInputStream stream(*connection, query, {}, global_context, &settings, nullptr, Tables(), query_processing_stage);
RemoteBlockInputStream stream(
*connection,
query, {}, global_context, &settings, nullptr, Tables(), query_processing_stage);
Progress progress;
stream.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); });
@ -485,7 +488,6 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
options["iterations"].as<size_t>(),
options["timelimit"].as<double>(),
options["json"].as<std::string>(),
ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings),
settings);
return benchmark.run();
}

View File

@ -204,7 +204,6 @@ private:
ConnectionParameters connection_parameters;
void initialize(Poco::Util::Application & self)
{
Poco::Util::Application::initialize(self);
@ -337,7 +336,7 @@ private:
DateLUT::instance();
if (!context.getSettingsRef().use_client_time_zone)
{
const auto & time_zone = connection->getServerTimezone();
const auto & time_zone = connection->getServerTimezone(connection_parameters.timeouts);
if (!time_zone.empty())
{
try
@ -521,7 +520,6 @@ private:
connection_parameters.default_database,
connection_parameters.user,
connection_parameters.password,
connection_parameters.timeouts,
"client",
connection_parameters.compression,
connection_parameters.security);
@ -537,11 +535,14 @@ private:
connection->setThrottler(throttler);
}
connection->getServerVersion(server_name, server_version_major, server_version_minor, server_version_patch, server_revision);
connection->getServerVersion(connection_parameters.timeouts,
server_name, server_version_major, server_version_minor, server_version_patch, server_revision);
server_version = toString(server_version_major) + "." + toString(server_version_minor) + "." + toString(server_version_patch);
if (server_display_name = connection->getServerDisplayName(); server_display_name.length() == 0)
if (
server_display_name = connection->getServerDisplayName(connection_parameters.timeouts);
server_display_name.length() == 0)
{
server_display_name = config().getString("host", "localhost");
}
@ -752,7 +753,7 @@ private:
}
if (!test_hint.checkActual(actual_server_error, actual_client_error, got_exception, last_exception))
connection->forceConnected();
connection->forceConnected(connection_parameters.timeouts);
if (got_exception && !ignore_error)
{
@ -828,7 +829,7 @@ private:
if (with_output && with_output->settings_ast)
apply_query_settings(*with_output->settings_ast);
connection->forceConnected();
connection->forceConnected(connection_parameters.timeouts);
/// INSERT query for which data transfer is needed (not an INSERT SELECT) is processed separately.
if (insert && !insert->select)
@ -899,7 +900,7 @@ private:
/// Process the query that doesn't require transferring data blocks to the server.
void processOrdinaryQuery()
{
connection->sendQuery(query, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true);
connection->sendQuery(connection_parameters.timeouts, query, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true);
sendExternalTables();
receiveResult();
}
@ -917,7 +918,7 @@ private:
if (!parsed_insert_query.data && (is_interactive || (stdin_is_not_tty && std_in.eof())))
throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT);
connection->sendQuery(query_without_data, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true);
connection->sendQuery(connection_parameters.timeouts, query_without_data, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true);
sendExternalTables();
/// Receive description of table structure.
@ -1064,7 +1065,7 @@ private:
bool cancelled = false;
// TODO: get the poll_interval from commandline.
const auto receive_timeout = connection->getTimeouts().receive_timeout;
const auto receive_timeout = connection_parameters.timeouts.receive_timeout;
constexpr size_t default_poll_interval = 1000000; /// in microseconds
constexpr size_t min_poll_interval = 5000; /// in microseconds
const size_t poll_interval

View File

@ -14,6 +14,7 @@
#include <Common/typeid_cast.h>
#include <Columns/ColumnString.h>
#include <Client/Connection.h>
#include <IO/ConnectionTimeouts.h>
namespace DB
@ -42,7 +43,7 @@ private:
"KILL", "QUERY", "SYNC", "ASYNC", "TEST", "BETWEEN", "TRUNCATE"
};
/// Words are fetched asynchonously.
/// Words are fetched asynchronously.
std::thread loading_thread;
std::atomic<bool> ready{false};
@ -71,7 +72,7 @@ private:
return word;
}
void loadImpl(Connection & connection, size_t suggestion_limit)
void loadImpl(Connection & connection, const ConnectionTimeouts & timeouts, size_t suggestion_limit)
{
std::stringstream query;
query << "SELECT DISTINCT arrayJoin(extractAll(name, '[\\\\w_]{2,}')) AS res FROM ("
@ -104,12 +105,12 @@ private:
query << ") WHERE notEmpty(res)";
fetch(connection, query.str());
fetch(connection, timeouts, query.str());
}
void fetch(Connection & connection, const std::string & query)
void fetch(Connection & connection, const ConnectionTimeouts & timeouts, const std::string & query)
{
connection.sendQuery(query);
connection.sendQuery(timeouts, query);
while (true)
{
@ -175,12 +176,11 @@ public:
connection_parameters.default_database,
connection_parameters.user,
connection_parameters.password,
connection_parameters.timeouts,
"client",
connection_parameters.compression,
connection_parameters.security);
loadImpl(connection, suggestion_limit);
loadImpl(connection, connection_parameters.timeouts, suggestion_limit);
}
catch (...)
{

View File

@ -54,6 +54,7 @@
#include <DataStreams/AsynchronousBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <DataStreams/NullBlockOutputStream.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadBufferFromFile.h>
@ -798,13 +799,13 @@ public:
}
void discoverShardPartitions(const TaskShardPtr & task_shard)
void discoverShardPartitions(const ConnectionTimeouts & timeouts, const TaskShardPtr & task_shard)
{
TaskTable & task_table = task_shard->task_table;
LOG_INFO(log, "Discover partitions of shard " << task_shard->getDescription());
auto get_partitions = [&] () { return getShardPartitions(*task_shard); };
auto get_partitions = [&] () { return getShardPartitions(timeouts, *task_shard); };
auto existing_partitions_names = retry(get_partitions, 60);
Strings filtered_partitions_names;
Strings missing_partitions;
@ -880,14 +881,14 @@ public:
}
/// Compute set of partitions, assume set of partitions aren't changed during the processing
void discoverTablePartitions(TaskTable & task_table, UInt64 num_threads = 0)
void discoverTablePartitions(const ConnectionTimeouts & timeouts, TaskTable & task_table, UInt64 num_threads = 0)
{
/// Fetch partitions list from a shard
{
ThreadPool thread_pool(num_threads ? num_threads : 2 * getNumberOfPhysicalCPUCores());
for (const TaskShardPtr & task_shard : task_table.all_shards)
thread_pool.schedule([this, task_shard]() { discoverShardPartitions(task_shard); });
thread_pool.schedule([this, timeouts, task_shard]() { discoverShardPartitions(timeouts, task_shard); });
LOG_DEBUG(log, "Waiting for " << thread_pool.active() << " setup jobs");
thread_pool.wait();
@ -955,7 +956,7 @@ public:
task_descprtion_current_version = version_to_update;
}
void process()
void process(const ConnectionTimeouts & timeouts)
{
for (TaskTable & task_table : task_cluster->table_tasks)
{
@ -969,7 +970,7 @@ public:
if (!task_table.has_enabled_partitions)
{
/// If there are no specified enabled_partitions, we must discover them manually
discoverTablePartitions(task_table);
discoverTablePartitions(timeouts, task_table);
/// After partitions of each shard are initialized, initialize cluster partitions
for (const TaskShardPtr & task_shard : task_table.all_shards)
@ -1009,7 +1010,7 @@ public:
bool table_is_done = false;
for (UInt64 num_table_tries = 0; num_table_tries < max_table_tries; ++num_table_tries)
{
if (tryProcessTable(task_table))
if (tryProcessTable(timeouts, task_table))
{
table_is_done = true;
break;
@ -1053,8 +1054,10 @@ protected:
return getWorkersPath() + "/" + host_id;
}
zkutil::EphemeralNodeHolder::Ptr createTaskWorkerNodeAndWaitIfNeed(const zkutil::ZooKeeperPtr & zookeeper,
const String & description, bool unprioritized)
zkutil::EphemeralNodeHolder::Ptr createTaskWorkerNodeAndWaitIfNeed(
const zkutil::ZooKeeperPtr & zookeeper,
const String & description,
bool unprioritized)
{
std::chrono::milliseconds current_sleep_time = default_sleep_time;
static constexpr std::chrono::milliseconds max_sleep_time(30000); // 30 sec
@ -1329,7 +1332,7 @@ protected:
static constexpr UInt64 max_table_tries = 1000;
static constexpr UInt64 max_shard_partition_tries = 600;
bool tryProcessTable(TaskTable & task_table)
bool tryProcessTable(const ConnectionTimeouts & timeouts, TaskTable & task_table)
{
/// An heuristic: if previous shard is already done, then check next one without sleeps due to max_workers constraint
bool previous_shard_is_instantly_finished = false;
@ -1360,7 +1363,7 @@ protected:
/// If not, did we check existence of that partition previously?
if (shard->checked_partitions.count(partition_name) == 0)
{
auto check_shard_has_partition = [&] () { return checkShardHasPartition(*shard, partition_name); };
auto check_shard_has_partition = [&] () { return checkShardHasPartition(timeouts, *shard, partition_name); };
bool has_partition = retry(check_shard_has_partition);
shard->checked_partitions.emplace(partition_name);
@ -1397,7 +1400,7 @@ protected:
bool was_error = false;
for (UInt64 try_num = 0; try_num < max_shard_partition_tries; ++try_num)
{
task_status = tryProcessPartitionTask(partition, is_unprioritized_task);
task_status = tryProcessPartitionTask(timeouts, partition, is_unprioritized_task);
/// Exit if success
if (task_status == PartitionTaskStatus::Finished)
@ -1483,13 +1486,13 @@ protected:
Error,
};
PartitionTaskStatus tryProcessPartitionTask(ShardPartition & task_partition, bool is_unprioritized_task)
PartitionTaskStatus tryProcessPartitionTask(const ConnectionTimeouts & timeouts, ShardPartition & task_partition, bool is_unprioritized_task)
{
PartitionTaskStatus res;
try
{
res = processPartitionTaskImpl(task_partition, is_unprioritized_task);
res = processPartitionTaskImpl(timeouts, task_partition, is_unprioritized_task);
}
catch (...)
{
@ -1510,7 +1513,7 @@ protected:
return res;
}
PartitionTaskStatus processPartitionTaskImpl(ShardPartition & task_partition, bool is_unprioritized_task)
PartitionTaskStatus processPartitionTaskImpl(const ConnectionTimeouts & timeouts, ShardPartition & task_partition, bool is_unprioritized_task)
{
TaskShard & task_shard = task_partition.task_shard;
TaskTable & task_table = task_shard.task_table;
@ -1611,7 +1614,7 @@ protected:
zookeeper->createAncestors(current_task_status_path);
/// We need to update table definitions for each partition, it could be changed after ALTER
createShardInternalTables(task_shard);
createShardInternalTables(timeouts, task_shard);
/// Check that destination partition is empty if we are first worker
/// NOTE: this check is incorrect if pull and push tables have different partition key!
@ -1828,23 +1831,25 @@ protected:
return typeid_cast<const ColumnString &>(*block.safeGetByPosition(0).column).getDataAt(0).toString();
}
ASTPtr getCreateTableForPullShard(TaskShard & task_shard)
ASTPtr getCreateTableForPullShard(const ConnectionTimeouts & timeouts, TaskShard & task_shard)
{
/// Fetch and parse (possibly) new definition
auto connection_entry = task_shard.info.pool->get(&task_cluster->settings_pull);
String create_query_pull_str = getRemoteCreateTable(task_shard.task_table.table_pull, *connection_entry,
&task_cluster->settings_pull);
auto connection_entry = task_shard.info.pool->get(timeouts, &task_cluster->settings_pull);
String create_query_pull_str = getRemoteCreateTable(
task_shard.task_table.table_pull,
*connection_entry,
&task_cluster->settings_pull);
ParserCreateQuery parser_create_query;
return parseQuery(parser_create_query, create_query_pull_str, 0);
}
void createShardInternalTables(TaskShard & task_shard, bool create_split = true)
void createShardInternalTables(const ConnectionTimeouts & timeouts, TaskShard & task_shard, bool create_split = true)
{
TaskTable & task_table = task_shard.task_table;
/// We need to update table definitions for each part, it could be changed after ALTER
task_shard.current_pull_table_create_query = getCreateTableForPullShard(task_shard);
task_shard.current_pull_table_create_query = getCreateTableForPullShard(timeouts, task_shard);
/// Create local Distributed tables:
/// a table fetching data from current shard and a table inserting data to the whole destination cluster
@ -1872,9 +1877,9 @@ protected:
}
std::set<String> getShardPartitions(TaskShard & task_shard)
std::set<String> getShardPartitions(const ConnectionTimeouts & timeouts, TaskShard & task_shard)
{
createShardInternalTables(task_shard, false);
createShardInternalTables(timeouts, task_shard, false);
TaskTable & task_table = task_shard.task_table;
@ -1914,9 +1919,9 @@ protected:
return res;
}
bool checkShardHasPartition(TaskShard & task_shard, const String & partition_quoted_name)
bool checkShardHasPartition(const ConnectionTimeouts & timeouts, TaskShard & task_shard, const String & partition_quoted_name)
{
createShardInternalTables(task_shard, false);
createShardInternalTables(timeouts, task_shard, false);
TaskTable & task_table = task_shard.task_table;
@ -1998,7 +2003,8 @@ protected:
Settings current_settings = settings ? *settings : task_cluster->settings_common;
current_settings.max_parallel_replicas = num_remote_replicas ? num_remote_replicas : 1;
auto connections = shard.pool->getMany(&current_settings, pool_mode);
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings).getSaturated(current_settings.max_execution_time);
auto connections = shard.pool->getMany(timeouts, &current_settings, pool_mode);
for (auto & connection : connections)
{
@ -2187,7 +2193,7 @@ void ClusterCopierApp::mainImpl()
copier->uploadTaskDescription(task_path, task_file, config().getBool("task-upload-force", false));
copier->init();
copier->process();
copier->process(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(context->getSettingsRef()));
}

View File

@ -3,6 +3,7 @@
#include <Core/Types.h>
#include <Common/CpuId.h>
#include <common/getMemoryAmount.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
@ -24,6 +25,7 @@ namespace
void waitQuery(Connection & connection)
{
bool finished = false;
while (true)
{
if (!connection.poll(1000000))
@ -50,12 +52,14 @@ namespace fs = boost::filesystem;
PerformanceTest::PerformanceTest(
const XMLConfigurationPtr & config_,
Connection & connection_,
const ConnectionTimeouts & timeouts_,
InterruptListener & interrupt_listener_,
const PerformanceTestInfo & test_info_,
Context & context_,
const std::vector<size_t> & queries_to_run_)
: config(config_)
, connection(connection_)
, timeouts(timeouts_)
, interrupt_listener(interrupt_listener_)
, test_info(test_info_)
, context(context_)
@ -108,7 +112,7 @@ bool PerformanceTest::checkPreconditions() const
size_t exist = 0;
connection.sendQuery(query, "", QueryProcessingStage::Complete, &test_info.settings, nullptr, false);
connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, &test_info.settings, nullptr, false);
while (true)
{
@ -188,7 +192,7 @@ void PerformanceTest::prepare() const
for (const auto & query : test_info.create_and_fill_queries)
{
LOG_INFO(log, "Executing create or fill query \"" << query << '\"');
connection.sendQuery(query, "", QueryProcessingStage::Complete, &test_info.settings, nullptr, false);
connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, &test_info.settings, nullptr, false);
waitQuery(connection);
LOG_INFO(log, "Query finished");
}
@ -200,7 +204,7 @@ void PerformanceTest::finish() const
for (const auto & query : test_info.drop_queries)
{
LOG_INFO(log, "Executing drop query \"" << query << '\"');
connection.sendQuery(query, "", QueryProcessingStage::Complete, &test_info.settings, nullptr, false);
connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, &test_info.settings, nullptr, false);
waitQuery(connection);
LOG_INFO(log, "Query finished");
}

View File

@ -5,6 +5,7 @@
#include <common/logger_useful.h>
#include <Poco/Util/XMLConfiguration.h>
#include <IO/ConnectionTimeouts.h>
#include "PerformanceTestInfo.h"
namespace DB
@ -20,6 +21,7 @@ public:
PerformanceTest(
const XMLConfigurationPtr & config_,
Connection & connection_,
const ConnectionTimeouts & timeouts_,
InterruptListener & interrupt_listener_,
const PerformanceTestInfo & test_info_,
Context & context_,
@ -45,6 +47,7 @@ private:
private:
XMLConfigurationPtr config;
Connection & connection;
const ConnectionTimeouts & timeouts;
InterruptListener & interrupt_listener;
PerformanceTestInfo test_info;

View File

@ -72,10 +72,11 @@ public:
Strings && tests_names_regexp_,
Strings && skip_names_regexp_,
const std::unordered_map<std::string, std::vector<size_t>> query_indexes_,
const ConnectionTimeouts & timeouts)
const ConnectionTimeouts & timeouts_)
: connection(host_, port_, default_database_, user_,
password_, timeouts, "performance-test", Protocol::Compression::Enable,
password_, "performance-test", Protocol::Compression::Enable,
secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable)
, timeouts(timeouts_)
, tests_tags(std::move(tests_tags_))
, tests_names(std::move(tests_names_))
, tests_names_regexp(std::move(tests_names_regexp_))
@ -100,7 +101,7 @@ public:
UInt64 version_minor;
UInt64 version_patch;
UInt64 version_revision;
connection.getServerVersion(name, version_major, version_minor, version_patch, version_revision);
connection.getServerVersion(timeouts, name, version_major, version_minor, version_patch, version_revision);
std::stringstream ss;
ss << version_major << "." << version_minor << "." << version_patch;
@ -115,6 +116,7 @@ public:
private:
Connection connection;
const ConnectionTimeouts & timeouts;
const Strings & tests_tags;
const Strings & tests_names;
@ -195,7 +197,7 @@ private:
{
PerformanceTestInfo info(test_config, profiles_file, global_context.getSettingsRef());
LOG_INFO(log, "Config for test '" << info.test_name << "' parsed");
PerformanceTest current(test_config, connection, interrupt_listener, info, global_context, query_indexes[info.path]);
PerformanceTest current(test_config, connection, timeouts, interrupt_listener, info, global_context, query_indexes[info.path]);
if (current.checkPreconditions())
{

View File

@ -48,7 +48,7 @@ namespace ErrorCodes
}
void Connection::connect()
void Connection::connect(const ConnectionTimeouts & timeouts)
{
try
{
@ -230,10 +230,15 @@ UInt16 Connection::getPort() const
return port;
}
void Connection::getServerVersion(String & name, UInt64 & version_major, UInt64 & version_minor, UInt64 & version_patch, UInt64 & revision)
void Connection::getServerVersion(const ConnectionTimeouts & timeouts,
String & name,
UInt64 & version_major,
UInt64 & version_minor,
UInt64 & version_patch,
UInt64 & revision)
{
if (!connected)
connect();
connect(timeouts);
name = server_name;
version_major = server_version_major;
@ -242,40 +247,40 @@ void Connection::getServerVersion(String & name, UInt64 & version_major, UInt64
revision = server_revision;
}
UInt64 Connection::getServerRevision()
UInt64 Connection::getServerRevision(const ConnectionTimeouts & timeouts)
{
if (!connected)
connect();
connect(timeouts);
return server_revision;
}
const String & Connection::getServerTimezone()
const String & Connection::getServerTimezone(const ConnectionTimeouts & timeouts)
{
if (!connected)
connect();
connect(timeouts);
return server_timezone;
}
const String & Connection::getServerDisplayName()
const String & Connection::getServerDisplayName(const ConnectionTimeouts & timeouts)
{
if (!connected)
connect();
connect(timeouts);
return server_display_name;
}
void Connection::forceConnected()
void Connection::forceConnected(const ConnectionTimeouts & timeouts)
{
if (!connected)
{
connect();
connect(timeouts);
}
else if (!ping())
{
LOG_TRACE(log_wrapper.get(), "Connection was closed, will reconnect.");
connect();
connect(timeouts);
}
}
@ -318,10 +323,11 @@ bool Connection::ping()
return true;
}
TablesStatusResponse Connection::getTablesStatus(const TablesStatusRequest & request)
TablesStatusResponse Connection::getTablesStatus(const ConnectionTimeouts & timeouts,
const TablesStatusRequest & request)
{
if (!connected)
connect();
connect(timeouts);
TimeoutSetter timeout_setter(*socket, sync_request_timeout, true);
@ -344,6 +350,7 @@ TablesStatusResponse Connection::getTablesStatus(const TablesStatusRequest & req
void Connection::sendQuery(
const ConnectionTimeouts & timeouts,
const String & query,
const String & query_id_,
UInt64 stage,
@ -352,7 +359,9 @@ void Connection::sendQuery(
bool with_pending_data)
{
if (!connected)
connect();
connect(timeouts);
TimeoutSetter timeout_setter(*socket, timeouts.send_timeout, timeouts.receive_timeout, true);
if (settings)
{

View File

@ -57,7 +57,6 @@ public:
Connection(const String & host_, UInt16 port_,
const String & default_database_,
const String & user_, const String & password_,
const ConnectionTimeouts & timeouts_,
const String & client_name_ = "client",
Protocol::Compression compression_ = Protocol::Compression::Enable,
Protocol::Secure secure_ = Protocol::Secure::Disable,
@ -68,7 +67,6 @@ public:
client_name(client_name_),
compression(compression_),
secure(secure_),
timeouts(timeouts_),
sync_request_timeout(sync_request_timeout_),
log_wrapper(*this)
{
@ -106,11 +104,16 @@ public:
/// Change default database. Changes will take effect on next reconnect.
void setDefaultDatabase(const String & database);
void getServerVersion(String & name, UInt64 & version_major, UInt64 & version_minor, UInt64 & version_patch, UInt64 & revision);
UInt64 getServerRevision();
void getServerVersion(const ConnectionTimeouts & timeouts,
String & name,
UInt64 & version_major,
UInt64 & version_minor,
UInt64 & version_patch,
UInt64 & revision);
UInt64 getServerRevision(const ConnectionTimeouts & timeouts);
const String & getServerTimezone();
const String & getServerDisplayName();
const String & getServerTimezone(const ConnectionTimeouts & timeouts);
const String & getServerDisplayName(const ConnectionTimeouts & timeouts);
/// For log and exception messages.
const String & getDescription() const;
@ -118,14 +121,9 @@ public:
UInt16 getPort() const;
const String & getDefaultDatabase() const;
/// For proper polling.
inline const auto & getTimeouts() const
{
return timeouts;
}
/// If last flag is true, you need to call sendExternalTablesData after.
void sendQuery(
const ConnectionTimeouts & timeouts,
const String & query,
const String & query_id_ = "",
UInt64 stage = QueryProcessingStage::Complete,
@ -156,9 +154,10 @@ public:
Packet receivePacket();
/// If not connected yet, or if connection is broken - then connect. If cannot connect - throw an exception.
void forceConnected();
void forceConnected(const ConnectionTimeouts & timeouts);
TablesStatusResponse getTablesStatus(const TablesStatusRequest & request);
TablesStatusResponse getTablesStatus(const ConnectionTimeouts & timeouts,
const TablesStatusRequest & request);
/** Disconnect.
* This may be used, if connection is left in unsynchronised state
@ -216,7 +215,6 @@ private:
*/
ThrottlerPtr throttler;
ConnectionTimeouts timeouts;
Poco::Timespan sync_request_timeout;
/// From where to read query execution result.
@ -252,7 +250,7 @@ private:
LoggerWrapper log_wrapper;
void connect();
void connect(const ConnectionTimeouts & timeouts);
void sendHello();
void receiveHello();
bool ping();

View File

@ -30,7 +30,9 @@ public:
/// Selects the connection to work.
/// If force_connected is false, the client must manually ensure that returned connection is good.
virtual Entry get(const Settings * settings = nullptr, bool force_connected = true) = 0;
virtual Entry get(const ConnectionTimeouts & timeouts,
const Settings * settings = nullptr,
bool force_connected = true) = 0;
};
using ConnectionPoolPtr = std::shared_ptr<IConnectionPool>;
@ -50,7 +52,6 @@ public:
const String & default_database_,
const String & user_,
const String & password_,
const ConnectionTimeouts & timeouts,
const String & client_name_ = "client",
Protocol::Compression compression_ = Protocol::Compression::Enable,
Protocol::Secure secure_ = Protocol::Secure::Disable)
@ -63,12 +64,13 @@ public:
password(password_),
client_name(client_name_),
compression(compression_),
secure{secure_},
timeouts(timeouts)
secure{secure_}
{
}
Entry get(const Settings * settings = nullptr, bool force_connected = true) override
Entry get(const ConnectionTimeouts & timeouts,
const Settings * settings = nullptr,
bool force_connected = true) override
{
Entry entry;
if (settings)
@ -77,7 +79,7 @@ public:
entry = Base::get(-1);
if (force_connected)
entry->forceConnected();
entry->forceConnected(timeouts);
return entry;
}
@ -93,7 +95,7 @@ protected:
{
return std::make_shared<Connection>(
host, port,
default_database, user, password, timeouts,
default_database, user, password,
client_name, compression, secure);
}
@ -108,7 +110,6 @@ private:
Protocol::Compression compression; /// Whether to compress data when interacting with the server.
Protocol::Secure secure; /// Whether to encrypt data when interacting with the server.
ConnectionTimeouts timeouts;
};
}

View File

@ -8,6 +8,8 @@
#include <Common/ProfileEvents.h>
#include <Core/Settings.h>
#include <IO/ConnectionTimeouts.h>
namespace ProfileEvents
{
@ -29,9 +31,8 @@ namespace ErrorCodes
ConnectionPoolWithFailover::ConnectionPoolWithFailover(
ConnectionPoolPtrs nested_pools_,
LoadBalancing load_balancing,
size_t max_tries_,
time_t decrease_error_period_)
: Base(std::move(nested_pools_), max_tries_, decrease_error_period_, &Logger::get("ConnectionPoolWithFailover"))
: Base(std::move(nested_pools_), decrease_error_period_, &Logger::get("ConnectionPoolWithFailover"))
, default_load_balancing(load_balancing)
{
const std::string & local_hostname = getFQDNOrHostName();
@ -44,11 +45,13 @@ ConnectionPoolWithFailover::ConnectionPoolWithFailover(
}
}
IConnectionPool::Entry ConnectionPoolWithFailover::get(const Settings * settings, bool /*force_connected*/)
IConnectionPool::Entry ConnectionPoolWithFailover::get(const ConnectionTimeouts & timeouts,
const Settings * settings,
bool /*force_connected*/)
{
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
{
return tryGetEntry(pool, fail_message, settings);
return tryGetEntry(pool, timeouts, fail_message, settings);
};
GetPriorityFunc get_priority;
@ -70,11 +73,13 @@ IConnectionPool::Entry ConnectionPoolWithFailover::get(const Settings * settings
return Base::get(try_get_entry, get_priority);
}
std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany(const Settings * settings, PoolMode pool_mode)
std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany(const ConnectionTimeouts & timeouts,
const Settings * settings,
PoolMode pool_mode)
{
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
{
return tryGetEntry(pool, fail_message, settings);
return tryGetEntry(pool, timeouts, fail_message, settings);
};
std::vector<TryResult> results = getManyImpl(settings, pool_mode, try_get_entry);
@ -86,22 +91,27 @@ std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany(const Se
return entries;
}
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyForTableFunction(const Settings * settings, PoolMode pool_mode)
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyForTableFunction(
const ConnectionTimeouts & timeouts,
const Settings * settings,
PoolMode pool_mode)
{
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
{
return tryGetEntry(pool, fail_message, settings);
return tryGetEntry(pool, timeouts, fail_message, settings);
};
return getManyImpl(settings, pool_mode, try_get_entry);
}
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyChecked(
const Settings * settings, PoolMode pool_mode, const QualifiedTableName & table_to_check)
const ConnectionTimeouts & timeouts,
const Settings * settings, PoolMode pool_mode,
const QualifiedTableName & table_to_check)
{
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
{
return tryGetEntry(pool, fail_message, settings, &table_to_check);
return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check);
};
return getManyImpl(settings, pool_mode, try_get_entry);
@ -113,6 +123,9 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
const TryGetEntryFunc & try_get_entry)
{
size_t min_entries = (settings && settings->skip_unavailable_shards) ? 0 : 1;
size_t max_tries = (settings ?
size_t{settings->connections_with_failover_max_tries} :
size_t{DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES});
size_t max_entries;
if (pool_mode == PoolMode::GET_ALL)
{
@ -144,12 +157,13 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
bool fallback_to_stale_replicas = settings ? bool(settings->fallback_to_stale_replicas_for_distributed_queries) : true;
return Base::getMany(min_entries, max_entries, try_get_entry, get_priority, fallback_to_stale_replicas);
return Base::getMany(min_entries, max_entries, max_tries, try_get_entry, get_priority, fallback_to_stale_replicas);
}
ConnectionPoolWithFailover::TryResult
ConnectionPoolWithFailover::tryGetEntry(
IConnectionPool & pool,
const ConnectionTimeouts & timeouts,
std::string & fail_message,
const Settings * settings,
const QualifiedTableName * table_to_check)
@ -157,15 +171,15 @@ ConnectionPoolWithFailover::tryGetEntry(
TryResult result;
try
{
result.entry = pool.get(settings, /* force_connected = */ false);
result.entry = pool.get(timeouts, settings, /* force_connected = */ false);
UInt64 server_revision = 0;
if (table_to_check)
server_revision = result.entry->getServerRevision();
server_revision = result.entry->getServerRevision(timeouts);
if (!table_to_check || server_revision < DBMS_MIN_REVISION_WITH_TABLES_STATUS)
{
result.entry->forceConnected();
result.entry->forceConnected(timeouts);
result.is_usable = true;
result.is_up_to_date = true;
return result;
@ -176,7 +190,7 @@ ConnectionPoolWithFailover::tryGetEntry(
TablesStatusRequest status_request;
status_request.tables.emplace(*table_to_check);
TablesStatusResponse status_response = result.entry->getTablesStatus(status_request);
TablesStatusResponse status_response = result.entry->getTablesStatus(timeouts, status_request);
auto table_status_it = status_response.table_states_by_id.find(*table_to_check);
if (table_status_it == status_response.table_states_by_id.end())
{

View File

@ -34,21 +34,24 @@ public:
ConnectionPoolWithFailover(
ConnectionPoolPtrs nested_pools_,
LoadBalancing load_balancing,
size_t max_tries_ = DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES,
time_t decrease_error_period_ = DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD);
using Entry = IConnectionPool::Entry;
/** Allocates connection to work. */
Entry get(const Settings * settings = nullptr, bool force_connected = true) override; /// From IConnectionPool
Entry get(const ConnectionTimeouts & timeouts,
const Settings * settings = nullptr,
bool force_connected = true) override; /// From IConnectionPool
/** Allocates up to the specified number of connections to work.
* Connections provide access to different replicas of one shard.
*/
std::vector<Entry> getMany(const Settings * settings, PoolMode pool_mode);
std::vector<Entry> getMany(const ConnectionTimeouts & timeouts,
const Settings * settings, PoolMode pool_mode);
/// The same as getMany(), but return std::vector<TryResult>.
std::vector<TryResult> getManyForTableFunction(const Settings * settings, PoolMode pool_mode);
std::vector<TryResult> getManyForTableFunction(const ConnectionTimeouts & timeouts,
const Settings * settings, PoolMode pool_mode);
using Base = PoolWithFailoverBase<IConnectionPool>;
using TryResult = Base::TryResult;
@ -56,7 +59,10 @@ public:
/// The same as getMany(), but check that replication delay for table_to_check is acceptable.
/// Delay threshold is taken from settings.
std::vector<TryResult> getManyChecked(
const Settings * settings, PoolMode pool_mode, const QualifiedTableName & table_to_check);
const ConnectionTimeouts & timeouts,
const Settings * settings,
PoolMode pool_mode,
const QualifiedTableName & table_to_check);
private:
/// Get the values of relevant settings and call Base::getMany()
@ -70,6 +76,7 @@ private:
/// for this table is not too large.
TryResult tryGetEntry(
IConnectionPool & pool,
const ConnectionTimeouts & timeouts,
std::string & fail_message,
const Settings * settings,
const QualifiedTableName * table_to_check = nullptr);

View File

@ -1,4 +1,5 @@
#include <Client/MultiplexedConnections.h>
#include <IO/ConnectionTimeouts.h>
namespace DB
{
@ -73,6 +74,7 @@ void MultiplexedConnections::sendExternalTablesData(std::vector<ExternalTablesDa
}
void MultiplexedConnections::sendQuery(
const ConnectionTimeouts & timeouts,
const String & query,
const String & query_id,
UInt64 stage,
@ -91,7 +93,7 @@ void MultiplexedConnections::sendQuery(
if (!replica.connection)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
if (replica.connection->getServerRevision() < DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD)
if (replica.connection->getServerRevision(timeouts) < DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD)
{
/// Disable two-level aggregation due to version incompatibility.
modified_settings.group_by_two_level_threshold = 0;
@ -107,13 +109,15 @@ void MultiplexedConnections::sendQuery(
for (size_t i = 0; i < num_replicas; ++i)
{
modified_settings.parallel_replica_offset = i;
replica_states[i].connection->sendQuery(query, query_id, stage, &modified_settings, client_info, with_pending_data);
replica_states[i].connection->sendQuery(timeouts, query, query_id,
stage, &modified_settings, client_info, with_pending_data);
}
}
else
{
/// Use single replica.
replica_states[0].connection->sendQuery(query, query_id, stage, &modified_settings, client_info, with_pending_data);
replica_states[0].connection->sendQuery(timeouts, query, query_id, stage,
&modified_settings, client_info, with_pending_data);
}
sent_query = true;

View File

@ -1,9 +1,10 @@
#pragma once
#include <mutex>
#include <Common/Throttler.h>
#include <Client/Connection.h>
#include <Client/ConnectionPoolWithFailover.h>
#include <mutex>
#include <IO/ConnectionTimeouts.h>
namespace DB
{
@ -31,6 +32,7 @@ public:
/// Send request to replicas.
void sendQuery(
const ConnectionTimeouts & timeouts,
const String & query,
const String & query_id = "",
UInt64 stage = QueryProcessingStage::Complete,

View File

@ -55,11 +55,9 @@ public:
PoolWithFailoverBase(
NestedPools nested_pools_,
size_t max_tries_,
time_t decrease_error_period_,
Logger * log_)
: nested_pools(std::move(nested_pools_))
, max_tries(max_tries_)
, decrease_error_period(decrease_error_period_)
, shared_pool_states(nested_pools.size())
, log(log_)
@ -108,7 +106,7 @@ public:
/// The method will throw if it is unable to get min_entries alive connections or
/// if fallback_to_stale_replicas is false and it is unable to get min_entries connections to up-to-date replicas.
std::vector<TryResult> getMany(
size_t min_entries, size_t max_entries,
size_t min_entries, size_t max_entries, size_t max_tries,
const TryGetEntryFunc & try_get_entry,
const GetPriorityFunc & get_priority = GetPriorityFunc(),
bool fallback_to_stale_replicas = true);
@ -125,8 +123,6 @@ protected:
NestedPools nested_pools;
const size_t max_tries;
const time_t decrease_error_period;
std::mutex pool_states_mutex;
@ -141,7 +137,7 @@ template <typename TNestedPool>
typename TNestedPool::Entry
PoolWithFailoverBase<TNestedPool>::get(const TryGetEntryFunc & try_get_entry, const GetPriorityFunc & get_priority)
{
std::vector<TryResult> results = getMany(1, 1, try_get_entry, get_priority);
std::vector<TryResult> results = getMany(1, 1, 1, try_get_entry, get_priority);
if (results.empty() || results[0].entry.isNull())
throw DB::Exception(
"PoolWithFailoverBase::getMany() returned less than min_entries entries.",
@ -152,7 +148,7 @@ PoolWithFailoverBase<TNestedPool>::get(const TryGetEntryFunc & try_get_entry, co
template <typename TNestedPool>
std::vector<typename PoolWithFailoverBase<TNestedPool>::TryResult>
PoolWithFailoverBase<TNestedPool>::getMany(
size_t min_entries, size_t max_entries,
size_t min_entries, size_t max_entries, size_t max_tries,
const TryGetEntryFunc & try_get_entry,
const GetPriorityFunc & get_priority,
bool fallback_to_stale_replicas)
@ -192,7 +188,7 @@ PoolWithFailoverBase<TNestedPool>::getMany(
size_t up_to_date_count = 0;
size_t failed_pools_count = 0;
/// At exit update shared error counts with error counts occured during this call.
/// At exit update shared error counts with error counts occurred during this call.
SCOPE_EXIT(
{
std::lock_guard lock(pool_states_mutex);

View File

@ -0,0 +1,520 @@
#include <cstring>
#include <Compression/CompressionCodecT64.h>
#include <Compression/CompressionFactory.h>
#include <common/unaligned.h>
#include <Parsers/IAST.h>
#include <Parsers/ASTLiteral.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int ILLEGAL_CODEC_PARAMETER;
extern const int LOGICAL_ERROR;
}
namespace
{
UInt8 codecId()
{
return static_cast<UInt8>(CompressionMethodByte::T64);
}
TypeIndex baseType(TypeIndex type_idx)
{
switch (type_idx)
{
case TypeIndex::Int8:
return TypeIndex::Int8;
case TypeIndex::Int16:
return TypeIndex::Int16;
case TypeIndex::Int32:
case TypeIndex::Decimal32:
return TypeIndex::Int32;
case TypeIndex::Int64:
case TypeIndex::Decimal64:
return TypeIndex::Int64;
case TypeIndex::UInt8:
case TypeIndex::Enum8:
return TypeIndex::UInt8;
case TypeIndex::UInt16:
case TypeIndex::Enum16:
case TypeIndex::Date:
return TypeIndex::UInt16;
case TypeIndex::UInt32:
case TypeIndex::DateTime:
return TypeIndex::UInt32;
case TypeIndex::UInt64:
return TypeIndex::UInt64;
default:
break;
}
return TypeIndex::Nothing;
}
TypeIndex typeIdx(const DataTypePtr & data_type)
{
if (!data_type)
return TypeIndex::Nothing;
WhichDataType which(*data_type);
switch (which.idx)
{
case TypeIndex::Int8:
case TypeIndex::UInt8:
case TypeIndex::Enum8:
case TypeIndex::Int16:
case TypeIndex::UInt16:
case TypeIndex::Enum16:
case TypeIndex::Date:
case TypeIndex::Int32:
case TypeIndex::UInt32:
case TypeIndex::DateTime:
case TypeIndex::Decimal32:
case TypeIndex::Int64:
case TypeIndex::UInt64:
case TypeIndex::Decimal64:
return which.idx;
default:
break;
}
return TypeIndex::Nothing;
}
void transpose64x8(const UInt64 * src, UInt64 * dst, UInt32)
{
auto * src8 = reinterpret_cast<const UInt8 *>(src);
for (UInt32 i = 0; i < 64; ++i)
{
UInt64 value = src8[i];
dst[0] |= (value & 0x1) << i;
dst[1] |= ((value >> 1) & 0x1) << i;
dst[2] |= ((value >> 2) & 0x1) << i;
dst[3] |= ((value >> 3) & 0x1) << i;
dst[4] |= ((value >> 4) & 0x1) << i;
dst[5] |= ((value >> 5) & 0x1) << i;
dst[6] |= ((value >> 6) & 0x1) << i;
dst[7] |= ((value >> 7) & 0x1) << i;
}
}
void revTranspose64x8(const UInt64 * src, UInt64 * dst, UInt32)
{
auto * dst8 = reinterpret_cast<UInt8 *>(dst);
for (UInt32 i = 0; i < 64; ++i)
{
dst8[i] = ((src[0] >> i) & 0x1)
| (((src[1] >> i) & 0x1) << 1)
| (((src[2] >> i) & 0x1) << 2)
| (((src[3] >> i) & 0x1) << 3)
| (((src[4] >> i) & 0x1) << 4)
| (((src[5] >> i) & 0x1) << 5)
| (((src[6] >> i) & 0x1) << 6)
| (((src[7] >> i) & 0x1) << 7);
}
}
template <typename _T>
void transposeBytes(_T value, UInt64 * mx, UInt32 col)
{
UInt8 * mx8 = reinterpret_cast<UInt8 *>(mx);
const UInt8 * value8 = reinterpret_cast<const UInt8 *>(&value);
if constexpr (sizeof(_T) > 4)
{
mx8[64 * 7 + col] = value8[7];
mx8[64 * 6 + col] = value8[6];
mx8[64 * 5 + col] = value8[5];
mx8[64 * 4 + col] = value8[4];
}
if constexpr (sizeof(_T) > 2)
{
mx8[64 * 3 + col] = value8[3];
mx8[64 * 2 + col] = value8[2];
}
if constexpr (sizeof(_T) > 1)
mx8[64 * 1 + col] = value8[1];
mx8[64 * 0 + col] = value8[0];
}
template <typename _T>
void revTransposeBytes(const UInt64 * mx, UInt32 col, _T & value)
{
auto * mx8 = reinterpret_cast<const UInt8 *>(mx);
if constexpr (sizeof(_T) > 4)
{
value |= UInt64(mx8[64 * 7 + col]) << (8 * 7);
value |= UInt64(mx8[64 * 6 + col]) << (8 * 6);
value |= UInt64(mx8[64 * 5 + col]) << (8 * 5);
value |= UInt64(mx8[64 * 4 + col]) << (8 * 4);
}
if constexpr (sizeof(_T) > 2)
{
value |= UInt32(mx8[64 * 3 + col]) << (8 * 3);
value |= UInt32(mx8[64 * 2 + col]) << (8 * 2);
}
if constexpr (sizeof(_T) > 1)
value |= UInt32(mx8[64 * 1 + col]) << (8 * 1);
value |= UInt32(mx8[col]);
}
/// UIntX[64] -> UInt64[N] transposed matrix, N <= X
template <typename _T>
void transpose(const char * src, char * dst, UInt32 num_bits, UInt32 tail = 64)
{
UInt32 full_bytes = num_bits / 8;
UInt32 part_bits = num_bits % 8;
UInt64 mx[64] = {};
const char * ptr = src;
for (UInt32 col = 0; col < tail; ++col, ptr += sizeof(_T))
{
_T value = unalignedLoad<_T>(ptr);
transposeBytes(value, mx, col);
}
UInt32 full_size = sizeof(UInt64) * (num_bits - part_bits);
memcpy(dst, mx, full_size);
dst += full_size;
/// transpose only partially filled last byte
if (part_bits)
{
UInt64 * partial = &mx[full_bytes * 8];
UInt64 res[8] = {};
transpose64x8(partial, res, part_bits);
memcpy(dst, res, part_bits * sizeof(UInt64));
}
}
/// UInt64[N] transposed matrix -> UIntX[64]
template <typename _T, typename _MinMaxT = std::conditional_t<std::is_signed_v<_T>, Int64, UInt64>>
void revTranspose(const char * src, char * dst, UInt32 num_bits, _MinMaxT min, _MinMaxT max [[maybe_unused]], UInt32 tail = 64)
{
UInt64 mx[64] = {};
memcpy(mx, src, num_bits * sizeof(UInt64));
UInt32 full_bytes = num_bits / 8;
UInt32 part_bits = num_bits % 8;
UInt64 * partial = &mx[full_bytes * 8];
if (part_bits)
{
UInt64 res[8] = {};
revTranspose64x8(partial, res, part_bits);
memcpy(partial, res, 8 * sizeof(UInt64));
}
_T upper_min = 0;
if (num_bits < 64)
upper_min = UInt64(min) >> num_bits << num_bits;
_T buf[64] = {};
if constexpr (std::is_signed_v<_T>)
{
/// Restore some data as negatives and others as positives
if (min < 0 && max >= 0 && num_bits < 64)
{
_T sign_bit = 1ull << (num_bits - 1);
_T upper_max = UInt64(max) >> num_bits << num_bits;
for (UInt32 col = 0; col < tail; ++col)
{
_T & value = buf[col];
revTransposeBytes(mx, col, value);
if (value & sign_bit)
value |= upper_min;
else
value |= upper_max;
}
memcpy(dst, buf, tail * sizeof(_T));
return;
}
}
for (UInt32 col = 0; col < tail; ++col)
{
_T & value = buf[col];
revTransposeBytes(mx, col, value);
value |= upper_min;
}
memcpy(dst, buf, tail * sizeof(_T));
}
UInt32 getValuableBitsNumber(UInt64 min, UInt64 max)
{
UInt64 diff_bits = min ^ max;
if (diff_bits)
return 64 - __builtin_clzll(diff_bits);
return 0;
}
UInt32 getValuableBitsNumber(Int64 min, Int64 max)
{
if (min < 0 && max >= 0)
{
if (min + max >= 0)
return getValuableBitsNumber(0ull, UInt64(max)) + 1;
else
return getValuableBitsNumber(0ull, UInt64(~min)) + 1;
}
else
return getValuableBitsNumber(UInt64(min), UInt64(max));
}
template <typename _T>
void findMinMax(const char * src, UInt32 src_size, _T & min, _T & max)
{
min = unalignedLoad<_T>(src);
max = unalignedLoad<_T>(src);
const char * end = src + src_size;
for (; src < end; src += sizeof(_T))
{
auto current = unalignedLoad<_T>(src);
if (current < min)
min = current;
if (current > max)
max = current;
}
}
template <typename _T>
UInt32 compressData(const char * src, UInt32 bytes_size, char * dst)
{
using MinMaxType = std::conditional_t<std::is_signed_v<_T>, Int64, UInt64>;
const UInt32 mx_size = 64;
const UInt32 header_size = 2 * sizeof(UInt64);
if (bytes_size % sizeof(_T))
throw Exception("Cannot compress, data size " + toString(bytes_size) + " is not multiplier of " + toString(sizeof(_T)),
ErrorCodes::CANNOT_COMPRESS);
UInt32 src_size = bytes_size / sizeof(_T);
UInt32 num_full = src_size / mx_size;
UInt32 tail = src_size % mx_size;
_T min, max;
findMinMax<_T>(src, bytes_size, min, max);
MinMaxType min64 = min;
MinMaxType max64 = max;
/// Write header
{
memcpy(dst, &min64, sizeof(MinMaxType));
memcpy(dst + 8, &max64, sizeof(MinMaxType));
dst += header_size;
}
UInt32 num_bits = getValuableBitsNumber(min64, max64);
if (!num_bits)
return header_size;
UInt32 src_shift = sizeof(_T) * mx_size;
UInt32 dst_shift = sizeof(UInt64) * num_bits;
for (UInt32 i = 0; i < num_full; ++i)
{
transpose<_T>(src, dst, num_bits);
src += src_shift;
dst += dst_shift;
}
UInt32 dst_bytes = num_full * dst_shift;
if (tail)
{
transpose<_T>(src, dst, num_bits, tail);
dst_bytes += dst_shift;
}
return header_size + dst_bytes;
}
template <typename _T>
void decompressData(const char * src, UInt32 bytes_size, char * dst, UInt32 uncompressed_size)
{
using MinMaxType = std::conditional_t<std::is_signed_v<_T>, Int64, UInt64>;
const UInt32 header_size = 2 * sizeof(UInt64);
if (bytes_size < header_size)
throw Exception("Cannot decompress, data size " + toString(bytes_size) + " is less then T64 header",
ErrorCodes::CANNOT_DECOMPRESS);
if (uncompressed_size % sizeof(_T))
throw Exception("Cannot decompress, unexpected uncompressed size " + toString(uncompressed_size),
ErrorCodes::CANNOT_DECOMPRESS);
UInt64 num_elements = uncompressed_size / sizeof(_T);
MinMaxType min;
MinMaxType max;
/// Read header
{
memcpy(&min, src, sizeof(MinMaxType));
memcpy(&max, src + 8, sizeof(MinMaxType));
src += header_size;
bytes_size -= header_size;
}
UInt32 num_bits = getValuableBitsNumber(min, max);
if (!num_bits)
{
_T min_value = min;
for (UInt32 i = 0; i < num_elements; ++i, dst += sizeof(_T))
unalignedStore(dst, min_value);
return;
}
UInt32 src_shift = sizeof(UInt64) * num_bits;
UInt32 dst_shift = sizeof(_T) * 64;
if (!bytes_size || bytes_size % src_shift)
throw Exception("Cannot decompress, data size " + toString(bytes_size) + " is not multiplier of " + toString(src_shift),
ErrorCodes::CANNOT_DECOMPRESS);
UInt32 num_full = bytes_size / src_shift;
UInt32 tail = num_elements % 64;
if (tail)
--num_full;
for (UInt32 i = 0; i < num_full; ++i)
{
revTranspose<_T>(src, dst, num_bits, min, max);
src += src_shift;
dst += dst_shift;
}
if (tail)
revTranspose<_T>(src, dst, num_bits, min, max, tail);
}
}
UInt32 CompressionCodecT64::doCompressData(const char * src, UInt32 src_size, char * dst) const
{
memcpy(dst, &type_idx, 1);
dst += 1;
switch (baseType(type_idx))
{
case TypeIndex::Int8:
return 1 + compressData<Int8>(src, src_size, dst);
case TypeIndex::Int16:
return 1 + compressData<Int16>(src, src_size, dst);
case TypeIndex::Int32:
return 1 + compressData<Int32>(src, src_size, dst);
case TypeIndex::Int64:
return 1 + compressData<Int64>(src, src_size, dst);
case TypeIndex::UInt8:
return 1 + compressData<UInt8>(src, src_size, dst);
case TypeIndex::UInt16:
return 1 + compressData<UInt16>(src, src_size, dst);
case TypeIndex::UInt32:
return 1 + compressData<UInt32>(src, src_size, dst);
case TypeIndex::UInt64:
return 1 + compressData<UInt64>(src, src_size, dst);
default:
break;
}
throw Exception("Connot compress with T64", ErrorCodes::CANNOT_COMPRESS);
}
void CompressionCodecT64::doDecompressData(const char * src, UInt32 src_size, char * dst, UInt32 uncompressed_size) const
{
if (!src_size)
throw Exception("Connot decompress with T64", ErrorCodes::CANNOT_DECOMPRESS);
UInt8 saved_type_id = unalignedLoad<UInt8>(src);
src += 1;
src_size -= 1;
TypeIndex actual_type_id = type_idx;
if (actual_type_id == TypeIndex::Nothing)
actual_type_id = static_cast<TypeIndex>(saved_type_id);
switch (baseType(actual_type_id))
{
case TypeIndex::Int8:
return decompressData<Int8>(src, src_size, dst, uncompressed_size);
case TypeIndex::Int16:
return decompressData<Int16>(src, src_size, dst, uncompressed_size);
case TypeIndex::Int32:
return decompressData<Int32>(src, src_size, dst, uncompressed_size);
case TypeIndex::Int64:
return decompressData<Int64>(src, src_size, dst, uncompressed_size);
case TypeIndex::UInt8:
return decompressData<UInt8>(src, src_size, dst, uncompressed_size);
case TypeIndex::UInt16:
return decompressData<UInt16>(src, src_size, dst, uncompressed_size);
case TypeIndex::UInt32:
return decompressData<UInt32>(src, src_size, dst, uncompressed_size);
case TypeIndex::UInt64:
return decompressData<UInt64>(src, src_size, dst, uncompressed_size);
default:
break;
}
throw Exception("Connot decompress with T64", ErrorCodes::CANNOT_DECOMPRESS);
}
void CompressionCodecT64::useInfoAboutType(DataTypePtr data_type)
{
if (data_type)
{
type_idx = typeIdx(data_type);
if (type_idx == TypeIndex::Nothing)
throw Exception("T64 codec is not supported for specified type", ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE);
}
}
UInt8 CompressionCodecT64::getMethodByte() const
{
return codecId();
}
void registerCodecT64(CompressionCodecFactory & factory)
{
auto reg_func = [&](const ASTPtr & arguments, DataTypePtr type) -> CompressionCodecPtr
{
if (arguments && !arguments->children.empty())
throw Exception("T64 codec should not have parameters", ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE);
auto type_idx = typeIdx(type);
if (type && type_idx == TypeIndex::Nothing)
throw Exception("T64 codec is not supported for specified type", ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE);
return std::make_shared<CompressionCodecT64>(type_idx);
};
factory.registerCompressionCodecWithType("T64", codecId(), reg_func);
}
}

View File

@ -0,0 +1,38 @@
#pragma once
#include <Core/Types.h>
#include <Compression/ICompressionCodec.h>
namespace DB
{
class CompressionCodecT64 : public ICompressionCodec
{
public:
static constexpr UInt32 HEADER_SIZE = 1 + 2 * sizeof(UInt64);
static constexpr UInt32 MAX_COMPRESSED_BLOCK_SIZE = sizeof(UInt64) * 64;
CompressionCodecT64(TypeIndex type_idx_)
: type_idx(type_idx_)
{}
UInt8 getMethodByte() const override;
String getCodecDesc() const override { return "T64"; }
void useInfoAboutType(DataTypePtr data_type) override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override
{
/// uncompressed_size - (uncompressed_size % (sizeof(T) * 64)) + sizeof(UInt64) * sizeof(T) + header_size
return uncompressed_size + MAX_COMPRESSED_BLOCK_SIZE + HEADER_SIZE;
}
private:
TypeIndex type_idx;
};
}

View File

@ -137,6 +137,7 @@ void registerCodecZSTD(CompressionCodecFactory & factory);
void registerCodecMultiple(CompressionCodecFactory & factory);
void registerCodecLZ4HC(CompressionCodecFactory & factory);
void registerCodecDelta(CompressionCodecFactory & factory);
void registerCodecT64(CompressionCodecFactory & factory);
CompressionCodecFactory::CompressionCodecFactory()
{
@ -147,6 +148,7 @@ CompressionCodecFactory::CompressionCodecFactory()
registerCodecMultiple(*this);
registerCodecLZ4HC(*this);
registerCodecDelta(*this);
registerCodecT64(*this);
}
}

View File

@ -40,6 +40,7 @@ enum class CompressionMethodByte : uint8_t
ZSTD = 0x90,
Multiple = 0x91,
Delta = 0x92,
T64 = 0x93,
};
}

View File

@ -7,6 +7,8 @@
#include <Interpreters/InternalTextLogsQueue.h>
#include <Storages/IStorage.h>
#include <IO/ConnectionTimeouts.h>
namespace DB
{
@ -61,17 +63,17 @@ RemoteBlockInputStream::RemoteBlockInputStream(
create_multiplexed_connections = [this, pool, throttler]()
{
const Settings & current_settings = context.getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
std::vector<IConnectionPool::Entry> connections;
if (main_table)
{
auto try_results = pool->getManyChecked(&current_settings, pool_mode, *main_table);
auto try_results = pool->getManyChecked(timeouts, &current_settings, pool_mode, *main_table);
connections.reserve(try_results.size());
for (auto & try_result : try_results)
connections.emplace_back(std::move(try_result.entry));
}
else
connections = pool->getMany(&current_settings, pool_mode);
connections = pool->getMany(timeouts, &current_settings, pool_mode);
return std::make_unique<MultiplexedConnections>(
std::move(connections), current_settings, throttler);
@ -283,12 +285,14 @@ void RemoteBlockInputStream::sendQuery()
{
multiplexed_connections = create_multiplexed_connections();
if (context.getSettingsRef().skip_unavailable_shards && 0 == multiplexed_connections->size())
const auto& settings = context.getSettingsRef();
if (settings.skip_unavailable_shards && 0 == multiplexed_connections->size())
return;
established = true;
multiplexed_connections->sendQuery(query, "", stage, &context.getClientInfo(), true);
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
multiplexed_connections->sendQuery(timeouts, query, "", stage, &context.getClientInfo(), true);
established = false;
sent_query = true;

View File

@ -96,6 +96,7 @@ private:
const String query;
Context context;
/// Temporary tables needed to be sent to remote servers
Tables external_tables;
QueryProcessingStage::Enum stage;
@ -118,7 +119,7 @@ private:
*/
std::atomic<bool> finished { false };
/** Cancel query request was sent to all replicas beacuse data is not needed anymore
/** Cancel query request was sent to all replicas because data is not needed anymore
* This behaviour may occur when:
* - data size is already satisfactory (when using LIMIT, for example)
* - an exception was thrown from client side

View File

@ -6,6 +6,7 @@
#include <Common/NetException.h>
#include <Common/CurrentThread.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <IO/ConnectionTimeouts.h>
namespace DB
@ -18,13 +19,16 @@ namespace ErrorCodes
}
RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_, const String & query_, const Settings * settings_)
RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_,
const ConnectionTimeouts & timeouts,
const String & query_,
const Settings * settings_)
: connection(connection_), query(query_), settings(settings_)
{
/** Send query and receive "header", that describe table structure.
* Header is needed to know, what structure is required for blocks to be passed to 'write' method.
*/
connection.sendQuery(query, "", QueryProcessingStage::Complete, settings, nullptr);
connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, settings, nullptr);
while (true)
{

View File

@ -3,6 +3,7 @@
#include <Core/Block.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Common/Throttler.h>
#include <IO/ConnectionTimeouts.h>
namespace DB
@ -18,7 +19,10 @@ struct Settings;
class RemoteBlockOutputStream : public IBlockOutputStream
{
public:
RemoteBlockOutputStream(Connection & connection_, const String & query_, const Settings * settings_ = nullptr);
RemoteBlockOutputStream(Connection & connection_,
const ConnectionTimeouts & timeouts,
const String & query_,
const Settings * settings_ = nullptr);
Block getHeader() const override { return header; }

View File

@ -43,10 +43,13 @@ private:
public:
using ExceptionCallback = std::function<void()>;
UnionBlockInputStream(BlockInputStreams inputs, BlockInputStreamPtr additional_input_at_end, size_t max_threads,
ExceptionCallback exception_callback_ = ExceptionCallback()) :
output_queue(std::min(inputs.size(), max_threads)),
handler(*this),
UnionBlockInputStream(
BlockInputStreams inputs,
BlockInputStreamPtr additional_input_at_end,
size_t max_threads,
ExceptionCallback exception_callback_ = ExceptionCallback()
) :
output_queue(std::min(inputs.size(), max_threads)), handler(*this),
processor(inputs, additional_input_at_end, max_threads, handler),
exception_callback(exception_callback_)
{

View File

@ -30,10 +30,8 @@ static ConnectionPoolWithFailoverPtr createPool(
bool secure,
const std::string & db,
const std::string & user,
const std::string & password,
const Context & context)
const std::string & password)
{
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(context.getSettingsRef());
ConnectionPoolPtrs pools;
pools.emplace_back(std::make_shared<ConnectionPool>(
MAX_CONNECTIONS,
@ -42,7 +40,6 @@ static ConnectionPoolWithFailoverPtr createPool(
db,
user,
password,
timeouts,
"ClickHouseDictionarySource",
Protocol::Compression::Enable,
secure ? Protocol::Secure::Enable : Protocol::Secure::Disable));
@ -72,7 +69,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(
, sample_block{sample_block}
, context(context_)
, is_local{isLocalAddress({host, port}, context.getTCPPort())}
, pool{is_local ? nullptr : createPool(host, port, secure, db, user, password, context)}
, pool{is_local ? nullptr : createPool(host, port, secure, db, user, password)}
, load_all_query{query_builder.composeLoadAllQuery()}
{
/// We should set user info even for the case when the dictionary is loaded in-process (without TCP communication).
@ -98,7 +95,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(const ClickHouseDictionar
, sample_block{other.sample_block}
, context(other.context)
, is_local{other.is_local}
, pool{is_local ? nullptr : createPool(host, port, secure, db, user, password, context)}
, pool{is_local ? nullptr : createPool(host, port, secure, db, user, password)}
, load_all_query{other.load_all_query}
{
}
@ -179,6 +176,7 @@ BlockInputStreamPtr ClickHouseDictionarySource::createStreamForSelectiveLoad(con
{
if (is_local)
return executeQuery(query, context, true).in;
return std::make_shared<RemoteBlockInputStream>(pool, query, sample_block, context);
}

View File

@ -13,6 +13,7 @@ namespace ActionLocks
extern const StorageActionBlockType PartsFetch = 2;
extern const StorageActionBlockType PartsSend = 3;
extern const StorageActionBlockType ReplicationQueue = 4;
extern const StorageActionBlockType DistributedSend = 5;
}

View File

@ -250,11 +250,10 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Setting
settings.distributed_connections_pool_size,
address.host_name, address.port,
address.default_database, address.user, address.password,
ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings).getSaturated(settings.max_execution_time),
"server", address.compression, address.secure);
info.pool = std::make_shared<ConnectionPoolWithFailover>(
ConnectionPoolPtrs{pool}, settings.load_balancing, settings.connections_with_failover_max_tries);
ConnectionPoolPtrs{pool}, settings.load_balancing);
info.per_replica_pools = {std::move(pool)};
if (weight)
@ -322,7 +321,6 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Setting
settings.distributed_connections_pool_size,
replica.host_name, replica.port,
replica.default_database, replica.user, replica.password,
ConnectionTimeouts::getTCPTimeoutsWithFailover(settings).getSaturated(settings.max_execution_time),
"server", replica.compression, replica.secure);
all_replicas_pools.emplace_back(replica_pool);
@ -331,7 +329,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Setting
}
ConnectionPoolWithFailoverPtr shard_pool = std::make_shared<ConnectionPoolWithFailover>(
all_replicas_pools, settings.load_balancing, settings.connections_with_failover_max_tries);
all_replicas_pools, settings.load_balancing);
if (weight)
slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size());
@ -375,7 +373,6 @@ Cluster::Cluster(const Settings & settings, const std::vector<std::vector<String
settings.distributed_connections_pool_size,
replica.host_name, replica.port,
replica.default_database, replica.user, replica.password,
ConnectionTimeouts::getTCPTimeoutsWithFailover(settings).getSaturated(settings.max_execution_time),
"server", replica.compression, replica.secure);
all_replicas.emplace_back(replica_pool);
if (replica.is_local && !treat_local_as_remote)
@ -383,7 +380,7 @@ Cluster::Cluster(const Settings & settings, const std::vector<std::vector<String
}
ConnectionPoolWithFailoverPtr shard_pool = std::make_shared<ConnectionPoolWithFailover>(
all_replicas, settings.load_balancing, settings.connections_with_failover_max_tries);
all_replicas, settings.load_balancing);
slot_to_shard.insert(std::end(slot_to_shard), default_weight, shards_info.size());
shards_info.push_back({{}, current_shard_num, default_weight, std::move(shard_local_addresses), std::move(shard_pool),

View File

@ -184,13 +184,17 @@ void SelectStreamFactory::createForShard(
local_delay]()
-> BlockInputStreamPtr
{
auto current_settings = context.getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(
current_settings).getSaturated(
current_settings.max_execution_time);
std::vector<ConnectionPoolWithFailover::TryResult> try_results;
try
{
if (table_func_ptr)
try_results = pool->getManyForTableFunction(&context.getSettingsRef(), PoolMode::GET_MANY);
try_results = pool->getManyForTableFunction(timeouts, &current_settings, PoolMode::GET_MANY);
else
try_results = pool->getManyChecked(&context.getSettingsRef(), PoolMode::GET_MANY, main_table);
try_results = pool->getManyChecked(timeouts, &current_settings, PoolMode::GET_MANY, main_table);
}
catch (const Exception & ex)
{

View File

@ -15,6 +15,7 @@
#include <Interpreters/PartLog.h>
#include <Interpreters/QueryThreadLog.h>
#include <Databases/IDatabase.h>
#include <Storages/StorageDistributed.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/StorageFactory.h>
#include <Parsers/ASTSystemQuery.h>
@ -22,6 +23,7 @@
#include <Parsers/ASTCreateQuery.h>
#include <csignal>
#include <algorithm>
#include "InterpreterSystemQuery.h"
namespace DB
@ -42,6 +44,7 @@ namespace ActionLocks
extern StorageActionBlockType PartsFetch;
extern StorageActionBlockType PartsSend;
extern StorageActionBlockType ReplicationQueue;
extern StorageActionBlockType DistributedSend;
}
@ -194,9 +197,18 @@ BlockIO InterpreterSystemQuery::execute()
case Type::START_REPLICATION_QUEUES:
startStopAction(context, query, ActionLocks::ReplicationQueue, true);
break;
case Type::STOP_DISTRIBUTED_SENDS:
startStopAction(context, query, ActionLocks::DistributedSend, false);
break;
case Type::START_DISTRIBUTED_SENDS:
startStopAction(context, query, ActionLocks::DistributedSend, true);
break;
case Type::SYNC_REPLICA:
syncReplica(query);
break;
case Type::FLUSH_DISTRIBUTED:
flushDistributed(query);
break;
case Type::RESTART_REPLICAS:
restartReplicas(system_context);
break;
@ -303,11 +315,21 @@ void InterpreterSystemQuery::syncReplica(ASTSystemQuery & query)
StoragePtr table = context.getTable(database_name, table_name);
auto table_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get());
if (!table_replicated)
if (auto storage_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get()))
storage_replicated->waitForShrinkingQueueSize(0, context.getSettingsRef().receive_timeout.value.milliseconds());
else
throw Exception("Table " + database_name + "." + table_name + " is not replicated", ErrorCodes::BAD_ARGUMENTS);
}
table_replicated->waitForShrinkingQueueSize(0, context.getSettingsRef().receive_timeout.value.milliseconds());
void InterpreterSystemQuery::flushDistributed(ASTSystemQuery & query)
{
String database_name = !query.target_database.empty() ? query.target_database : context.getCurrentDatabase();
String & table_name = query.target_table;
if (auto storage_distributed = dynamic_cast<StorageDistributed *>(context.getTable(database_name, table_name).get()))
storage_distributed->flushClusterNodesAllData();
else
throw Exception("Table " + database_name + "." + table_name + " is not distributed", ErrorCodes::BAD_ARGUMENTS);
}

View File

@ -31,6 +31,7 @@ private:
void restartReplicas(Context & system_context);
void syncReplica(ASTSystemQuery & query);
void flushDistributed(ASTSystemQuery & query);
};

View File

@ -41,6 +41,8 @@ const char * ASTSystemQuery::typeToString(Type type)
return "RESTART REPLICA";
case Type::SYNC_REPLICA:
return "SYNC REPLICA";
case Type::FLUSH_DISTRIBUTED:
return "FLUSH DISTRIBUTED";
case Type::RELOAD_DICTIONARY:
return "RELOAD DICTIONARY";
case Type::RELOAD_DICTIONARIES:
@ -65,6 +67,10 @@ const char * ASTSystemQuery::typeToString(Type type)
return "STOP REPLICATION QUEUES";
case Type::START_REPLICATION_QUEUES:
return "START REPLICATION QUEUES";
case Type::STOP_DISTRIBUTED_SENDS:
return "STOP DISTRIBUTED SENDS";
case Type::START_DISTRIBUTED_SENDS:
return "START DISTRIBUTED SENDS";
case Type::FLUSH_LOGS:
return "FLUSH LOGS";
default:
@ -99,12 +105,14 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
|| type == Type::STOP_REPLICATED_SENDS
|| type == Type::START_REPLICATED_SENDS
|| type == Type::STOP_REPLICATION_QUEUES
|| type == Type::START_REPLICATION_QUEUES)
|| type == Type::START_REPLICATION_QUEUES
|| type == Type::STOP_DISTRIBUTED_SENDS
|| type == Type::START_DISTRIBUTED_SENDS)
{
if (!target_table.empty())
print_database_table();
}
else if (type == Type::RESTART_REPLICA || type == Type::SYNC_REPLICA)
else if (type == Type::RESTART_REPLICA || type == Type::SYNC_REPLICA || type == Type::FLUSH_DISTRIBUTED)
{
print_database_table();
}

View File

@ -40,6 +40,9 @@ public:
STOP_REPLICATION_QUEUES,
START_REPLICATION_QUEUES,
FLUSH_LOGS,
FLUSH_DISTRIBUTED,
STOP_DISTRIBUTED_SENDS,
START_DISTRIBUTED_SENDS,
END
};

View File

@ -49,6 +49,7 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
case Type::RESTART_REPLICA:
case Type::SYNC_REPLICA:
case Type::FLUSH_DISTRIBUTED:
if (!parseDatabaseAndTableName(pos, expected, res->target_database, res->target_table))
return false;
break;
@ -61,6 +62,8 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
case Type::START_REPLICATED_SENDS:
case Type::STOP_REPLICATION_QUEUES:
case Type::START_REPLICATION_QUEUES:
case Type::STOP_DISTRIBUTED_SENDS:
case Type::START_DISTRIBUTED_SENDS:
parseDatabaseAndTableName(pos, expected, res->target_database, res->target_table);
break;

View File

@ -11,6 +11,7 @@
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedReadBuffer.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/Operators.h>
#include <boost/algorithm/string/find_iterator.hpp>
@ -29,6 +30,7 @@ namespace DB
namespace ErrorCodes
{
extern const int ABORTED;
extern const int INCORRECT_FILE_NAME;
extern const int CHECKSUM_DOESNT_MATCH;
extern const int TOO_LARGE_SIZE_COMPRESSED;
@ -57,12 +59,14 @@ namespace
}
StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(StorageDistributed & storage, const std::string & name, const ConnectionPoolPtr & pool)
StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
StorageDistributed & storage, const std::string & name, const ConnectionPoolPtr & pool, ActionBlocker & monitor_blocker)
: storage(storage), pool{pool}, path{storage.path + name + '/'}
, current_batch_file_path{path + "current_batch.txt"}
, default_sleep_time{storage.global_context.getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds()}
, sleep_time{default_sleep_time}
, log{&Logger::get(getLoggerName())}
, monitor_blocker(monitor_blocker)
{
const Settings & settings = storage.global_context.getSettingsRef();
should_batch_inserts = settings.distributed_directory_monitor_batch_inserts;
@ -84,6 +88,14 @@ StorageDistributedDirectoryMonitor::~StorageDistributedDirectoryMonitor()
}
}
void StorageDistributedDirectoryMonitor::flushAllData()
{
if (!quit)
{
std::unique_lock lock{mutex};
processFiles();
}
}
void StorageDistributedDirectoryMonitor::shutdownAndDropAllData()
{
@ -113,18 +125,25 @@ void StorageDistributedDirectoryMonitor::run()
{
auto do_sleep = true;
try
if (!monitor_blocker.isCancelled())
{
do_sleep = !findFiles();
try
{
do_sleep = !processFiles();
}
catch (...)
{
do_sleep = true;
++error_count;
sleep_time = std::min(
std::chrono::milliseconds{Int64(default_sleep_time.count() * std::exp2(error_count))},
std::chrono::milliseconds{max_sleep_time});
tryLogCurrentException(getLoggerName().data());
}
}
catch (...)
else
{
do_sleep = true;
++error_count;
sleep_time = std::min(
std::chrono::milliseconds{Int64(default_sleep_time.count() * std::exp2(error_count))},
std::chrono::milliseconds{max_sleep_time});
tryLogCurrentException(getLoggerName().data());
LOG_DEBUG(log, "Skipping send data over distributed table.");
}
if (do_sleep)
@ -142,8 +161,7 @@ void StorageDistributedDirectoryMonitor::run()
ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::string & name, const StorageDistributed & storage)
{
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.global_context.getSettingsRef());
const auto pool_factory = [&storage, &timeouts] (const Cluster::Address & address) -> ConnectionPoolPtr
const auto pool_factory = [&storage] (const Cluster::Address & address) -> ConnectionPoolPtr
{
const auto & cluster = storage.getCluster();
const auto & shards_info = cluster->getShardsInfo();
@ -164,7 +182,7 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri
}
return std::make_shared<ConnectionPool>(
1, address.host_name, address.port, address.default_database, address.user, address.password, timeouts,
1, address.host_name, address.port, address.default_database, address.user, address.password,
storage.getName() + '_' + address.user, Protocol::Compression::Enable, address.secure);
};
@ -174,7 +192,7 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri
}
bool StorageDistributedDirectoryMonitor::findFiles()
bool StorageDistributedDirectoryMonitor::processFiles()
{
std::map<UInt64, std::string> files;
@ -212,7 +230,8 @@ bool StorageDistributedDirectoryMonitor::findFiles()
void StorageDistributedDirectoryMonitor::processFile(const std::string & file_path)
{
LOG_TRACE(log, "Started processing `" << file_path << '`');
auto connection = pool->get();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.global_context.getSettingsRef());
auto connection = pool->get(timeouts);
try
{
@ -224,7 +243,7 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
std::string insert_query;
readQueryAndSettings(in, insert_settings, insert_query);
RemoteBlockOutputStream remote{*connection, insert_query, &insert_settings};
RemoteBlockOutputStream remote{*connection, timeouts, insert_query, &insert_settings};
remote.writePrefix();
remote.writePrepared(in);
@ -334,8 +353,8 @@ struct StorageDistributedDirectoryMonitor::Batch
WriteBufferFromFile out{parent.current_batch_file_path};
writeText(out);
}
auto connection = parent.pool->get();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(parent.storage.global_context.getSettingsRef());
auto connection = parent.pool->get(timeouts);
bool batch_broken = false;
try
@ -361,7 +380,7 @@ struct StorageDistributedDirectoryMonitor::Batch
if (first)
{
first = false;
remote = std::make_unique<RemoteBlockOutputStream>(*connection, insert_query, &insert_settings);
remote = std::make_unique<RemoteBlockOutputStream>(*connection, timeouts, insert_query, &insert_settings);
remote->writePrefix();
}

View File

@ -19,15 +19,19 @@ namespace DB
class StorageDistributedDirectoryMonitor
{
public:
StorageDistributedDirectoryMonitor(StorageDistributed & storage, const std::string & name, const ConnectionPoolPtr & pool);
StorageDistributedDirectoryMonitor(
StorageDistributed & storage, const std::string & name, const ConnectionPoolPtr & pool, ActionBlocker & monitor_blocker);
~StorageDistributedDirectoryMonitor();
static ConnectionPoolPtr createPool(const std::string & name, const StorageDistributed & storage);
void flushAllData();
void shutdownAndDropAllData();
private:
void run();
bool findFiles();
bool processFiles();
void processFile(const std::string & file_path);
void processFilesWithBatching(const std::map<UInt64, std::string> & files);
@ -57,6 +61,7 @@ private:
std::mutex mutex;
std::condition_variable cond;
Logger * log;
ActionBlocker & monitor_blocker;
ThreadFromGlobalPool thread{&StorageDistributedDirectoryMonitor::run, this};
/// Read insert query and insert settings for backward compatible.

View File

@ -242,6 +242,8 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp
{
if (!job.stream)
{
const Settings & settings = context.getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
if (shard_info.hasInternalReplication())
{
/// Skip replica_index in case of internal replication
@ -249,7 +251,7 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp
throw Exception("There are several writing job for an automatically replicated shard", ErrorCodes::LOGICAL_ERROR);
/// TODO: it make sense to rewrite skip_unavailable_shards and max_parallel_replicas here
auto connections = shard_info.pool->getMany(&context.getSettingsRef(), PoolMode::GET_ONE);
auto connections = shard_info.pool->getMany(timeouts, &settings, PoolMode::GET_ONE);
if (connections.empty() || connections.front().isNull())
throw Exception("Expected exactly one connection for shard " + toString(job.shard_index), ErrorCodes::LOGICAL_ERROR);
@ -263,7 +265,7 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp
if (!connection_pool)
throw Exception("Connection pool for replica " + replica.readableString() + " does not exist", ErrorCodes::LOGICAL_ERROR);
job.connection_entry = connection_pool->get(&context.getSettingsRef());
job.connection_entry = connection_pool->get(timeouts, &settings);
if (job.connection_entry.isNull())
throw Exception("Got empty connection for replica" + replica.readableString(), ErrorCodes::LOGICAL_ERROR);
}
@ -271,7 +273,7 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp
if (throttler)
job.connection_entry->setThrottler(throttler);
job.stream = std::make_shared<RemoteBlockOutputStream>(*job.connection_entry, query_string, &context.getSettingsRef());
job.stream = std::make_shared<RemoteBlockOutputStream>(*job.connection_entry, timeouts, query_string, &settings);
job.stream->writePrefix();
}

View File

@ -400,12 +400,19 @@ void MergeTreeDataPart::remove() const
{
/// Remove each expected file in directory, then remove directory itself.
#if !__clang__
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-variable"
#endif
for (const auto & [file, _] : checksums.files)
{
String path_to_remove = to + "/" + file;
if (0 != unlink(path_to_remove.c_str()))
throwFromErrno("Cannot unlink file " + path_to_remove, ErrorCodes::CANNOT_UNLINK);
}
#if !__clang__
#pragma GCC diagnostic pop
#endif
for (const auto & file : {"checksums.txt", "columns.txt"})
{

View File

@ -65,6 +65,10 @@ namespace ErrorCodes
extern const int TOO_MANY_ROWS;
}
namespace ActionLocks
{
extern const StorageActionBlockType DistributedSend;
}
namespace
{
@ -427,7 +431,7 @@ void StorageDistributed::createDirectoryMonitors()
void StorageDistributed::requireDirectoryMonitor(const std::string & name)
{
std::lock_guard lock(cluster_nodes_mutex);
cluster_nodes_data[name].requireDirectoryMonitor(name, *this);
cluster_nodes_data[name].requireDirectoryMonitor(name, *this, monitors_blocker);
}
ConnectionPoolPtr StorageDistributed::requireConnectionPool(const std::string & name)
@ -454,11 +458,17 @@ void StorageDistributed::ClusterNodeData::requireConnectionPool(const std::strin
conneciton_pool = StorageDistributedDirectoryMonitor::createPool(name, storage);
}
void StorageDistributed::ClusterNodeData::requireDirectoryMonitor(const std::string & name, StorageDistributed & storage)
void StorageDistributed::ClusterNodeData::requireDirectoryMonitor(
const std::string & name, StorageDistributed & storage, ActionBlocker & monitor_blocker)
{
requireConnectionPool(name, storage);
if (!directory_monitor)
directory_monitor = std::make_unique<StorageDistributedDirectoryMonitor>(storage, name, conneciton_pool);
directory_monitor = std::make_unique<StorageDistributedDirectoryMonitor>(storage, name, conneciton_pool, monitor_blocker);
}
void StorageDistributed::ClusterNodeData::flushAllData()
{
directory_monitor->flushAllData();
}
void StorageDistributed::ClusterNodeData::shutdownAndDropAllData()
@ -499,6 +509,22 @@ ClusterPtr StorageDistributed::skipUnusedShards(ClusterPtr cluster, const Select
return cluster->getClusterWithMultipleShards({shards.begin(), shards.end()});
}
ActionLock StorageDistributed::getActionLock(StorageActionBlockType type)
{
if (type == ActionLocks::DistributedSend)
return monitors_blocker.cancel();
return {};
}
void StorageDistributed::flushClusterNodesAllData()
{
std::lock_guard lock(cluster_nodes_mutex);
/// TODO: Maybe it should be executed in parallel
for (auto it = cluster_nodes_data.begin(); it != cluster_nodes_data.end(); ++it)
it->second.flushAllData();
}
void registerStorageDistributed(StorageFactory & factory)
{

View File

@ -11,6 +11,7 @@
#include <Interpreters/ExpressionActions.h>
#include <Parsers/ASTFunction.h>
#include <common/logger_useful.h>
#include <Common/ActionBlocker.h>
namespace DB
@ -105,8 +106,11 @@ public:
/// ensure connection pool creation and return it
ConnectionPoolPtr requireConnectionPool(const std::string & name);
void flushClusterNodesAllData();
ClusterPtr getCluster() const;
ActionLock getActionLock(StorageActionBlockType type) override;
String table_name;
String remote_database;
@ -135,7 +139,9 @@ public:
/// Creates connection_pool if not exists.
void requireConnectionPool(const std::string & name, const StorageDistributed & storage);
/// Creates directory_monitor if not exists.
void requireDirectoryMonitor(const std::string & name, StorageDistributed & storage);
void requireDirectoryMonitor(const std::string & name, StorageDistributed & storage, ActionBlocker & monitor_blocker);
void flushAllData();
void shutdownAndDropAllData();
};
@ -145,6 +151,8 @@ public:
/// Used for global monotonic ordering of files to send.
SimpleIncrement file_names_increment;
ActionBlocker monitors_blocker;
protected:
StorageDistributed(
const String & database_name,

View File

@ -370,6 +370,11 @@ void StorageMaterializedView::checkPartitionCanBeDropped(const ASTPtr & partitio
target_table->checkPartitionCanBeDropped(partition);
}
ActionLock StorageMaterializedView::getActionLock(StorageActionBlockType type)
{
return has_inner_table ? getTargetTable()->getActionLock(type) : ActionLock{};
}
void registerStorageMaterializedView(StorageFactory & factory)
{
factory.registerStorage("MaterializedView", [](const StorageFactory::Arguments & args)

View File

@ -51,6 +51,8 @@ public:
StoragePtr getTargetTable() const;
StoragePtr tryGetTargetTable() const;
ActionLock getActionLock(StorageActionBlockType type) override;
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,

View File

@ -3986,8 +3986,6 @@ void StorageReplicatedMergeTree::sendRequestToLeaderReplica(const ASTPtr & query
else
throw Exception("Can't proxy this query. Unsupported query type", ErrorCodes::NOT_IMPLEMENTED);
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithoutFailover(global_context.getSettingsRef());
const auto & query_settings = query_context.getSettingsRef();
const auto & query_client_info = query_context.getClientInfo();
String user = query_client_info.current_user;
@ -4003,7 +4001,7 @@ void StorageReplicatedMergeTree::sendRequestToLeaderReplica(const ASTPtr & query
leader_address.host,
leader_address.queries_port,
leader_address.database,
user, password, timeouts, "Follower replica");
user, password, "Follower replica");
std::stringstream new_query_ss;
formatAST(*new_query, new_query_ss, false, true);

View File

@ -54,8 +54,8 @@ def run_single_test(args, ext, server_logs_level, case_file, stdout_file, stderr
sleep(0.01)
# Normalize randomized database names in stdout, stderr files.
os.system("sed -i 's/{test_db}/default/g' {file}".format(test_db=args.database, file=stdout_file))
os.system("sed -i 's/{test_db}/default/g' {file}".format(test_db=args.database, file=stderr_file))
os.system("sed -i -e 's/{test_db}/default/g' {file}".format(test_db=args.database, file=stdout_file))
os.system("sed -i -e 's/{test_db}/default/g' {file}".format(test_db=args.database, file=stderr_file))
stdout = open(stdout_file, 'r').read() if os.path.exists(stdout_file) else ''
stdout = unicode(stdout, errors='replace', encoding='utf-8')
@ -416,7 +416,12 @@ def find_binary(name):
return True
# maybe it wasn't in PATH
return os.access(os.path.join('/usr/bin', name), os.X_OK)
if os.access(os.path.join('/usr/local/bin', name), os.X_OK):
return True
if os.access(os.path.join('/usr/bin', name), os.X_OK):
return True
return False
if __name__ == '__main__':
parser=ArgumentParser(description='ClickHouse functional tests')

View File

@ -14,7 +14,7 @@ Don't use Docker from your system repository.
* [pip](https://pypi.python.org/pypi/pip). To install: `sudo apt-get install python-pip`
* [py.test](https://docs.pytest.org/) testing framework. To install: `sudo -H pip install pytest`
* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml kazoo PyMySQL psycopg2 pymongo tzlocal kafka-python`
* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml kazoo PyMySQL psycopg2 pymongo tzlocal kafka-python protobuf`
(highly not recommended) If you really want to use OS packages on modern debian/ubuntu instead of "pip": `sudo apt install -y docker docker-compose python-pytest python-dicttoxml python-docker python-pymysql python-pymongo python-tzlocal python-kazoo python-psycopg2 python-kafka`

View File

@ -0,0 +1,18 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,35 @@
<yandex>
<profiles>
<default>
<connections_with_failover_max_tries>3</connections_with_failover_max_tries>
<connect_timeout_with_failover_ms>1000</connect_timeout_with_failover_ms>
<min_insert_block_size_rows>1</min_insert_block_size_rows>
</default>
<delays>
<connections_with_failover_max_tries>5</connections_with_failover_max_tries>
<connect_timeout_with_failover_ms>3000</connect_timeout_with_failover_ms>
<min_insert_block_size_rows>1</min_insert_block_size_rows>
</delays>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
<ready_to_wait>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>delays</profile>
<quota>default</quota>
</ready_to_wait>
</users>
<quotas><default></default></quotas>
</yandex>

View File

@ -0,0 +1,141 @@
import itertools
import timeit
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
NODES = {'node' + str(i): cluster.add_instance(
'node' + str(i),
main_configs=['configs/remote_servers.xml'],
user_configs=['configs/set_distributed_defaults.xml'],
) for i in (1, 2)}
CREATE_TABLES_SQL = '''
CREATE DATABASE test;
CREATE TABLE base_table(
node String
)
ENGINE = MergeTree
PARTITION BY node
ORDER BY node;
CREATE TABLE distributed_table
ENGINE = Distributed(test_cluster, default, base_table) AS base_table;
'''
INSERT_SQL_TEMPLATE = "INSERT INTO base_table VALUES ('{node_id}')"
SELECTS_SQL = {
'distributed': 'SELECT node FROM distributed_table ORDER BY node',
'remote': ("SELECT node FROM remote('node1,node2', default.base_table) "
"ORDER BY node"),
}
EXCEPTION_NETWORK = 'e.displayText() = DB::NetException: '
EXCEPTION_TIMEOUT = 'Timeout exceeded while reading from socket ('
EXCEPTION_CONNECT = 'Timeout: connect timed out: '
TIMEOUT_MEASUREMENT_EPS = 0.01
EXPECTED_BEHAVIOR = {
'default': {
'times': 3,
'timeout': 1,
},
'ready_to_wait': {
'times': 5,
'timeout': 3,
},
}
def _check_exception(exception, expected_tries=3):
lines = exception.split('\n')
assert len(lines) > 4, "Unexpected exception (expected: timeout info)"
assert lines[0].startswith('Received exception from server (version')
assert lines[1].startswith('Code: 279')
assert lines[1].endswith('All connection tries failed. Log: ')
assert lines[2] == '', "Unexpected exception text (expected: empty line)"
for i, line in enumerate(lines[3:3 + expected_tries]):
expected_lines = (
'Code: 209, ' + EXCEPTION_NETWORK + EXCEPTION_TIMEOUT,
'Code: 209, ' + EXCEPTION_NETWORK + EXCEPTION_CONNECT,
)
assert any(line.startswith(expected) for expected in expected_lines), \
'Unexpected exception at one of the connection attempts'
assert lines[3 + expected_tries] == '', 'Wrong number of connect attempts'
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
for node_id, node in NODES.items():
node.query(CREATE_TABLES_SQL)
node.query(INSERT_SQL_TEMPLATE.format(node_id=node_id))
yield cluster
finally:
cluster.shutdown()
def _check_timeout_and_exception(node, user, query_base):
repeats = EXPECTED_BEHAVIOR[user]['times']
expected_timeout = EXPECTED_BEHAVIOR[user]['timeout'] * repeats
start = timeit.default_timer()
exception = node.query_and_get_error(SELECTS_SQL[query_base], user=user)
# And it should timeout no faster than:
measured_timeout = timeit.default_timer() - start
assert measured_timeout >= expected_timeout - TIMEOUT_MEASUREMENT_EPS
# And exception should reflect connection attempts:
_check_exception(exception, repeats)
@pytest.mark.parametrize(
('first_user', 'node_name', 'query_base'),
tuple(itertools.product(EXPECTED_BEHAVIOR, NODES, SELECTS_SQL)),
)
def test_reconnect(started_cluster, node_name, first_user, query_base):
node = NODES[node_name]
# Everything is up, select should work:
assert TSV(node.query(SELECTS_SQL[query_base],
user=first_user)) == TSV('node1\nnode2')
with PartitionManager() as pm:
# Break the connection.
pm.partition_instances(*NODES.values())
# Now it shouldn't:
_check_timeout_and_exception(node, first_user, query_base)
# Other user should have different timeout and exception
_check_timeout_and_exception(
node,
'default' if first_user != 'default' else 'ready_to_wait',
query_base,
)
# select should work again:
assert TSV(node.query(SELECTS_SQL[query_base],
user=first_user)) == TSV('node1\nnode2')

View File

@ -0,0 +1,18 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,41 @@
from contextlib import contextmanager
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'])
node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'])
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
for node in (node1, node2):
node.query('''CREATE TABLE local_table(id UInt32, val String) ENGINE = MergeTree ORDER BY id;''')
node1.query('''CREATE TABLE distributed_table(id UInt32, val String) ENGINE = Distributed(test_cluster, default, local_table, id);''')
yield cluster
finally:
cluster.shutdown()
def test_start_and_stop_replica_send(started_cluster):
node1.query("SYSTEM STOP DISTRIBUTED SENDS distributed_table;")
node1.query("INSERT INTO distributed_table VALUES (0, 'node1')")
node1.query("INSERT INTO distributed_table VALUES (1, 'node2')")
# Write only to this node when stop distributed sends
assert node1.query("SELECT COUNT() FROM distributed_table").rstrip() == '1'
node1.query("SYSTEM START DISTRIBUTED SENDS distributed_table;")
node1.query("SYSTEM FLUSH DISTRIBUTED distributed_table;")
assert node1.query("SELECT COUNT() FROM distributed_table").rstrip() == '2'

View File

@ -0,0 +1,163 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
function query_with_retry
{
retry=0
until [ $retry -ge 5 ]
do
result=`$CLICKHOUSE_CLIENT $2 --query="$1" 2>&1`
if [ "$?" == 0 ]; then
echo -n $result
return
else
retry=$(($retry + 1))
sleep 3
fi
done
echo "Query '$1' failed with '$result'"
}
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS test.src;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS test.dst_r1;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS test.dst_r2;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE test.src (p UInt64, k String, d UInt64) ENGINE = MergeTree PARTITION BY p ORDER BY k;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE test.dst_r1 (p UInt64, k String, d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/test/dst_1', '1') PARTITION BY p ORDER BY k SETTINGS old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=0;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE test.dst_r2 (p UInt64, k String, d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/test/dst_1', '2') PARTITION BY p ORDER BY k SETTINGS old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=0;"
$CLICKHOUSE_CLIENT --query="INSERT INTO test.src VALUES (0, '0', 1);"
$CLICKHOUSE_CLIENT --query="INSERT INTO test.src VALUES (1, '0', 1);"
$CLICKHOUSE_CLIENT --query="INSERT INTO test.src VALUES (1, '1', 1);"
$CLICKHOUSE_CLIENT --query="INSERT INTO test.src VALUES (2, '0', 1);"
$CLICKHOUSE_CLIENT --query="SELECT 'Initial';"
$CLICKHOUSE_CLIENT --query="INSERT INTO test.dst_r1 VALUES (0, '1', 2);"
$CLICKHOUSE_CLIENT --query="INSERT INTO test.dst_r1 VALUES (1, '1', 2), (1, '2', 2);"
$CLICKHOUSE_CLIENT --query="INSERT INTO test.dst_r1 VALUES (2, '1', 2);"
$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA test.dst_r2;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM test.src;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM test.dst_r1;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM test.dst_r2;"
$CLICKHOUSE_CLIENT --query="SELECT 'REPLACE simple';"
query_with_retry "ALTER TABLE test.dst_r1 REPLACE PARTITION 1 FROM test.src;"
query_with_retry "ALTER TABLE test.src DROP PARTITION 1;"
$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA test.dst_r2;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM test.src;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM test.dst_r1;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM test.dst_r2;"
$CLICKHOUSE_CLIENT --query="SELECT 'REPLACE empty';"
query_with_retry "ALTER TABLE test.src DROP PARTITION 1;"
query_with_retry "ALTER TABLE test.dst_r1 REPLACE PARTITION 1 FROM test.src;"
$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA test.dst_r2;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM test.dst_r1;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM test.dst_r2;"
$CLICKHOUSE_CLIENT --query="SELECT 'REPLACE recursive';"
query_with_retry "ALTER TABLE test.dst_r1 DROP PARTITION 1;"
$CLICKHOUSE_CLIENT --query="INSERT INTO test.dst_r1 VALUES (1, '1', 2), (1, '2', 2);"
$CLICKHOUSE_CLIENT --query="CREATE table test_block_numbers (m UInt64) ENGINE MergeTree() ORDER BY tuple();"
$CLICKHOUSE_CLIENT --query="INSERT INTO test_block_numbers SELECT max(max_block_number) AS m FROM system.parts WHERE database='test' AND table='dst_r1' AND active AND name LIKE '1_%';"
query_with_retry "ALTER TABLE test.dst_r1 REPLACE PARTITION 1 FROM test.dst_r1;"
$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA test.dst_r2;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM test.dst_r1;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM test.dst_r2;"
$CLICKHOUSE_CLIENT --query="INSERT INTO test_block_numbers SELECT max(max_block_number) AS m FROM system.parts WHERE database='test' AND table='dst_r1' AND active AND name LIKE '1_%';"
$CLICKHOUSE_CLIENT --query="SELECT (max(m) - min(m) > 1) AS new_block_is_generated FROM test_block_numbers;"
$CLICKHOUSE_CLIENT --query="DROP TEMPORARY TABLE test_block_numbers;"
$CLICKHOUSE_CLIENT --query="SELECT 'ATTACH FROM';"
query_with_retry "ALTER TABLE test.dst_r1 DROP PARTITION 1;"
$CLICKHOUSE_CLIENT --query="DROP TABLE test.src;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE test.src (p UInt64, k String, d UInt64) ENGINE = MergeTree PARTITION BY p ORDER BY k;"
$CLICKHOUSE_CLIENT --query="INSERT INTO test.src VALUES (1, '0', 1);"
$CLICKHOUSE_CLIENT --query="INSERT INTO test.src VALUES (1, '1', 1);"
$CLICKHOUSE_CLIENT --query="INSERT INTO test.dst_r2 VALUES (1, '1', 2);"
query_with_retry "ALTER TABLE test.dst_r2 ATTACH PARTITION 1 FROM test.src;"
$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA test.dst_r1;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM test.dst_r1;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM test.dst_r2;"
$CLICKHOUSE_CLIENT --query="SELECT 'REPLACE with fetch';"
$CLICKHOUSE_CLIENT --query="DROP TABLE test.src;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE test.src (p UInt64, k String, d UInt64) ENGINE = MergeTree PARTITION BY p ORDER BY k;"
$CLICKHOUSE_CLIENT --query="INSERT INTO test.src VALUES (1, '0', 1);"
$CLICKHOUSE_CLIENT --query="INSERT INTO test.src VALUES (1, '1', 1);"
$CLICKHOUSE_CLIENT --query="INSERT INTO test.dst_r1 VALUES (1, '1', 2);" -- trash part to be
# Stop replication at the second replica and remove source table to use fetch instead of copying
$CLICKHOUSE_CLIENT --query="SYSTEM STOP REPLICATION QUEUES test.dst_r2;"
query_with_retry "ALTER TABLE test.dst_r1 REPLACE PARTITION 1 FROM test.src;"
$CLICKHOUSE_CLIENT --query="DROP TABLE test.src;"
$CLICKHOUSE_CLIENT --query="SYSTEM START REPLICATION QUEUES test.dst_r2;"
$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA test.dst_r2;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM test.dst_r1;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM test.dst_r2;"
$CLICKHOUSE_CLIENT --query="SELECT 'REPLACE with fetch of merged';"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS test.src;"
query_with_retry "ALTER TABLE test.dst_r1 DROP PARTITION 1;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE test.src (p UInt64, k String, d UInt64) ENGINE = MergeTree PARTITION BY p ORDER BY k;"
$CLICKHOUSE_CLIENT --query="INSERT INTO test.src VALUES (1, '0', 1);"
$CLICKHOUSE_CLIENT --query="INSERT INTO test.src VALUES (1, '1', 1);"
$CLICKHOUSE_CLIENT --query="INSERT INTO test.dst_r1 VALUES (1, '1', 2); -- trash part to be deleted"
$CLICKHOUSE_CLIENT --query="SYSTEM STOP MERGES test.dst_r2;"
$CLICKHOUSE_CLIENT --query="SYSTEM STOP REPLICATION QUEUES test.dst_r2;"
query_with_retry "ALTER TABLE test.dst_r1 REPLACE PARTITION 1 FROM test.src;"
$CLICKHOUSE_CLIENT --query="DROP TABLE test.src;"
# do not wait other replicas to execute OPTIMIZE
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d), uniqExact(_part) FROM test.dst_r1;"
$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA test.dst_r1;"
query_with_retry "OPTIMIZE TABLE test.dst_r1 PARTITION 1;" "--replication_alter_partitions_sync=0 --optimize_throw_if_noop=1"
$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA test.dst_r1;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d), uniqExact(_part) FROM test.dst_r1;"
$CLICKHOUSE_CLIENT --query="SYSTEM START REPLICATION QUEUES test.dst_r2;"
$CLICKHOUSE_CLIENT --query="SYSTEM START MERGES test.dst_r2;"
$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA test.dst_r2;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d), uniqExact(_part) FROM test.dst_r2;"
$CLICKHOUSE_CLIENT --query="SELECT 'After restart';"
$CLICKHOUSE_CLIENT --query="USE test;"
$CLICKHOUSE_CLIENT --query="SYSTEM RESTART REPLICA test.dst_r1;"
$CLICKHOUSE_CLIENT --query="SYSTEM RESTART REPLICAS;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM test.dst_r1;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM test.dst_r2;"
$CLICKHOUSE_CLIENT --query="SELECT 'DETACH+ATTACH PARTITION';"
query_with_retry "ALTER TABLE test.dst_r1 DETACH PARTITION 0;"
query_with_retry "ALTER TABLE test.dst_r1 DETACH PARTITION 1;"
query_with_retry "ALTER TABLE test.dst_r1 DETACH PARTITION 2;"
query_with_retry "ALTER TABLE test.dst_r1 ATTACH PARTITION 1;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM test.dst_r1;"
$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA test.dst_r2;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM test.dst_r2;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS test.src;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS test.dst_r1;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS test.dst_r2;"

View File

@ -1,141 +0,0 @@
DROP TABLE IF EXISTS test.src;
DROP TABLE IF EXISTS test.dst_r1;
DROP TABLE IF EXISTS test.dst_r2;
CREATE TABLE test.src (p UInt64, k String, d UInt64) ENGINE = MergeTree PARTITION BY p ORDER BY k;
CREATE TABLE test.dst_r1 (p UInt64, k String, d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/test/dst_1', '1') PARTITION BY p ORDER BY k SETTINGS old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=0;
CREATE TABLE test.dst_r2 (p UInt64, k String, d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/test/dst_1', '2') PARTITION BY p ORDER BY k SETTINGS old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=0;
INSERT INTO test.src VALUES (0, '0', 1);
INSERT INTO test.src VALUES (1, '0', 1);
INSERT INTO test.src VALUES (1, '1', 1);
INSERT INTO test.src VALUES (2, '0', 1);
SELECT 'Initial';
INSERT INTO test.dst_r1 VALUES (0, '1', 2);
INSERT INTO test.dst_r1 VALUES (1, '1', 2), (1, '2', 2);
INSERT INTO test.dst_r1 VALUES (2, '1', 2);
SYSTEM SYNC REPLICA test.dst_r2;
SELECT count(), sum(d) FROM test.src;
SELECT count(), sum(d) FROM test.dst_r1;
SELECT count(), sum(d) FROM test.dst_r2;
SELECT 'REPLACE simple';
ALTER TABLE test.dst_r1 REPLACE PARTITION 1 FROM test.src;
ALTER TABLE test.src DROP PARTITION 1;
SYSTEM SYNC REPLICA test.dst_r2;
SELECT count(), sum(d) FROM test.src;
SELECT count(), sum(d) FROM test.dst_r1;
SELECT count(), sum(d) FROM test.dst_r2;
SELECT 'REPLACE empty';
ALTER TABLE test.src DROP PARTITION 1;
ALTER TABLE test.dst_r1 REPLACE PARTITION 1 FROM test.src;
SYSTEM SYNC REPLICA test.dst_r2;
SELECT count(), sum(d) FROM test.dst_r1;
SELECT count(), sum(d) FROM test.dst_r2;
SELECT 'REPLACE recursive';
ALTER TABLE test.dst_r1 DROP PARTITION 1;
INSERT INTO test.dst_r1 VALUES (1, '1', 2), (1, '2', 2);
CREATE TEMPORARY table test_block_numbers (m UInt64);
INSERT INTO test_block_numbers SELECT max(max_block_number) AS m FROM system.parts WHERE database='test' AND table='dst_r1' AND active AND name LIKE '1_%';
ALTER TABLE test.dst_r1 REPLACE PARTITION 1 FROM test.dst_r1;
SYSTEM SYNC REPLICA test.dst_r2;
SELECT count(), sum(d) FROM test.dst_r1;
SELECT count(), sum(d) FROM test.dst_r2;
INSERT INTO test_block_numbers SELECT max(max_block_number) AS m FROM system.parts WHERE database='test' AND table='dst_r1' AND active AND name LIKE '1_%';
SELECT (max(m) - min(m) > 1) AS new_block_is_generated FROM test_block_numbers;
DROP TEMPORARY TABLE test_block_numbers;
SELECT 'ATTACH FROM';
ALTER TABLE test.dst_r1 DROP PARTITION 1;
DROP TABLE test.src;
CREATE TABLE test.src (p UInt64, k String, d UInt64) ENGINE = MergeTree PARTITION BY p ORDER BY k;
INSERT INTO test.src VALUES (1, '0', 1);
INSERT INTO test.src VALUES (1, '1', 1);
INSERT INTO test.dst_r2 VALUES (1, '1', 2);
ALTER TABLE test.dst_r2 ATTACH PARTITION 1 FROM test.src;
SYSTEM SYNC REPLICA test.dst_r1;
SELECT count(), sum(d) FROM test.dst_r1;
SELECT count(), sum(d) FROM test.dst_r2;
SELECT 'REPLACE with fetch';
DROP TABLE test.src;
CREATE TABLE test.src (p UInt64, k String, d UInt64) ENGINE = MergeTree PARTITION BY p ORDER BY k;
INSERT INTO test.src VALUES (1, '0', 1);
INSERT INTO test.src VALUES (1, '1', 1);
INSERT INTO test.dst_r1 VALUES (1, '1', 2); -- trash part to be deleted
-- Stop replication at the second replica and remove source table to use fetch instead of copying
SYSTEM STOP REPLICATION QUEUES test.dst_r2;
ALTER TABLE test.dst_r1 REPLACE PARTITION 1 FROM test.src;
DROP TABLE test.src;
SYSTEM START REPLICATION QUEUES test.dst_r2;
SYSTEM SYNC REPLICA test.dst_r2;
SELECT count(), sum(d) FROM test.dst_r1;
SELECT count(), sum(d) FROM test.dst_r2;
SELECT 'REPLACE with fetch of merged';
DROP TABLE IF EXISTS test.src;
ALTER TABLE test.dst_r1 DROP PARTITION 1;
CREATE TABLE test.src (p UInt64, k String, d UInt64) ENGINE = MergeTree PARTITION BY p ORDER BY k;
INSERT INTO test.src VALUES (1, '0', 1);
INSERT INTO test.src VALUES (1, '1', 1);
INSERT INTO test.dst_r1 VALUES (1, '1', 2); -- trash part to be deleted
SYSTEM STOP MERGES test.dst_r2;
SYSTEM STOP REPLICATION QUEUES test.dst_r2;
ALTER TABLE test.dst_r1 REPLACE PARTITION 1 FROM test.src;
DROP TABLE test.src;
-- do not wait other replicas to execute OPTIMIZE
SET replication_alter_partitions_sync=0, optimize_throw_if_noop=1;
SELECT count(), sum(d), uniqExact(_part) FROM test.dst_r1;
SYSTEM SYNC REPLICA test.dst_r1;
OPTIMIZE TABLE test.dst_r1 PARTITION 1;
SET replication_alter_partitions_sync=1;
SYSTEM SYNC REPLICA test.dst_r1;
SELECT count(), sum(d), uniqExact(_part) FROM test.dst_r1;
SYSTEM START REPLICATION QUEUES test.dst_r2;
SYSTEM START MERGES test.dst_r2;
SYSTEM SYNC REPLICA test.dst_r2;
SELECT count(), sum(d), uniqExact(_part) FROM test.dst_r2;
SELECT 'After restart';
USE test;
SYSTEM RESTART REPLICA dst_r1;
SYSTEM RESTART REPLICAS;
SELECT count(), sum(d) FROM test.dst_r1;
SELECT count(), sum(d) FROM test.dst_r2;
SELECT 'DETACH+ATTACH PARTITION';
ALTER TABLE test.dst_r1 DETACH PARTITION 0;
ALTER TABLE test.dst_r1 DETACH PARTITION 1;
ALTER TABLE test.dst_r1 DETACH PARTITION 2;
ALTER TABLE test.dst_r1 ATTACH PARTITION 1;
SELECT count(), sum(d) FROM test.dst_r1;
SYSTEM SYNC REPLICA test.dst_r2;
SELECT count(), sum(d) FROM test.dst_r2;
DROP TABLE IF EXISTS test.src;
DROP TABLE IF EXISTS test.dst_r1;
DROP TABLE IF EXISTS test.dst_r2;

View File

@ -0,0 +1,7 @@
0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0
1 1 1 1 1 1 1 1
42 42 42 42 42 42 42 42
42 42 42 42 42 42 42 42
42 42 42 42 42 42 42 42
42 42 42 42 42 42 42 42

View File

@ -0,0 +1,101 @@
DROP TABLE IF EXISTS test.t64;
CREATE TABLE test.t64
(
u8 UInt8,
t_u8 UInt8 Codec(T64, ZSTD),
u16 UInt16,
t_u16 UInt16 Codec(T64, ZSTD),
u32 UInt32,
t_u32 UInt32 Codec(T64, ZSTD),
u64 UInt64,
t_u64 UInt64 Codec(T64, ZSTD)
) ENGINE MergeTree() ORDER BY tuple();
INSERT INTO test.t64 SELECT number AS x, x, x, x, x, x, x, x FROM numbers(1);
INSERT INTO test.t64 SELECT number AS x, x, x, x, x, x, x, x FROM numbers(2);
INSERT INTO test.t64 SELECT 42 AS x, x, x, x, x, x, x, x FROM numbers(4);
SELECT * FROM test.t64 ORDER BY u64;
INSERT INTO test.t64 SELECT number AS x, x, x, x, x, x, x, x FROM numbers(intExp2(8));
INSERT INTO test.t64 SELECT number AS x, x, x, x, x, x, x, x FROM numbers(intExp2(9));
SELECT * FROM test.t64 WHERE u8 != t_u8;
SELECT * FROM test.t64 WHERE u16 != t_u16;
SELECT * FROM test.t64 WHERE u32 != t_u32;
SELECT * FROM test.t64 WHERE u64 != t_u64;
INSERT INTO test.t64 SELECT (intExp2(16) - 10 + number) AS x, x, x, x, x, x, x, x FROM numbers(10);
INSERT INTO test.t64 SELECT (intExp2(16) - 10 + number) AS x, x, x, x, x, x, x, x FROM numbers(11);
INSERT INTO test.t64 SELECT (intExp2(16) - 64 + number) AS x, x, x, x, x, x, x, x FROM numbers(64);
INSERT INTO test.t64 SELECT (intExp2(16) - 64 + number) AS x, x, x, x, x, x, x, x FROM numbers(65);
INSERT INTO test.t64 SELECT (intExp2(16) - 1 + number) AS x, x, x, x, x, x, x, x FROM numbers(65);
SELECT * FROM test.t64 WHERE u8 != t_u8;
SELECT * FROM test.t64 WHERE u16 != t_u16;
SELECT * FROM test.t64 WHERE u32 != t_u32;
SELECT * FROM test.t64 WHERE u64 != t_u64;
INSERT INTO test.t64 SELECT (intExp2(24) - 10 + number) AS x, x, x, x, x, x, x, x FROM numbers(10);
INSERT INTO test.t64 SELECT (intExp2(24) - 10 + number) AS x, x, x, x, x, x, x, x FROM numbers(11);
INSERT INTO test.t64 SELECT (intExp2(24) - 64 + number) AS x, x, x, x, x, x, x, x FROM numbers(128);
INSERT INTO test.t64 SELECT (intExp2(24) - 64 + number) AS x, x, x, x, x, x, x, x FROM numbers(129);
INSERT INTO test.t64 SELECT (intExp2(24) - 1 + number) AS x, x, x, x, x, x, x, x FROM numbers(129);
SELECT * FROM test.t64 WHERE u8 != t_u8;
SELECT * FROM test.t64 WHERE u16 != t_u16;
SELECT * FROM test.t64 WHERE u32 != t_u32;
SELECT * FROM test.t64 WHERE u64 != t_u64;
INSERT INTO test.t64 SELECT (intExp2(32) - 10 + number) AS x, x, x, x, x, x, x, x FROM numbers(10);
INSERT INTO test.t64 SELECT (intExp2(32) - 10 + number) AS x, x, x, x, x, x, x, x FROM numbers(20);
INSERT INTO test.t64 SELECT (intExp2(32) - 64 + number) AS x, x, x, x, x, x, x, x FROM numbers(256);
INSERT INTO test.t64 SELECT (intExp2(32) - 64 + number) AS x, x, x, x, x, x, x, x FROM numbers(257);
INSERT INTO test.t64 SELECT (intExp2(32) - 1 + number) AS x, x, x, x, x, x, x, x FROM numbers(257);
SELECT * FROM test.t64 WHERE u8 != t_u8;
SELECT * FROM test.t64 WHERE u16 != t_u16;
SELECT * FROM test.t64 WHERE u32 != t_u32;
SELECT * FROM test.t64 WHERE u64 != t_u64;
INSERT INTO test.t64 SELECT (intExp2(40) - 10 + number) AS x, x, x, x, x, x, x, x FROM numbers(10);
INSERT INTO test.t64 SELECT (intExp2(40) - 10 + number) AS x, x, x, x, x, x, x, x FROM numbers(20);
INSERT INTO test.t64 SELECT (intExp2(40) - 64 + number) AS x, x, x, x, x, x, x, x FROM numbers(512);
INSERT INTO test.t64 SELECT (intExp2(40) - 64 + number) AS x, x, x, x, x, x, x, x FROM numbers(513);
INSERT INTO test.t64 SELECT (intExp2(40) - 1 + number) AS x, x, x, x, x, x, x, x FROM numbers(513);
SELECT * FROM test.t64 WHERE u8 != t_u8;
SELECT * FROM test.t64 WHERE u16 != t_u16;
SELECT * FROM test.t64 WHERE u32 != t_u32;
SELECT * FROM test.t64 WHERE u64 != t_u64;
INSERT INTO test.t64 SELECT (intExp2(48) - 10 + number) AS x, x, x, x, x, x, x, x FROM numbers(10);
INSERT INTO test.t64 SELECT (intExp2(48) - 10 + number) AS x, x, x, x, x, x, x, x FROM numbers(20);
INSERT INTO test.t64 SELECT (intExp2(48) - 64 + number) AS x, x, x, x, x, x, x, x FROM numbers(1024);
INSERT INTO test.t64 SELECT (intExp2(48) - 64 + number) AS x, x, x, x, x, x, x, x FROM numbers(1025);
INSERT INTO test.t64 SELECT (intExp2(48) - 1 + number) AS x, x, x, x, x, x, x, x FROM numbers(1025);
SELECT * FROM test.t64 WHERE u8 != t_u8;
SELECT * FROM test.t64 WHERE u16 != t_u16;
SELECT * FROM test.t64 WHERE u32 != t_u32;
SELECT * FROM test.t64 WHERE u64 != t_u64;
INSERT INTO test.t64 SELECT (intExp2(56) - 10 + number) AS x, x, x, x, x, x, x, x FROM numbers(10);
INSERT INTO test.t64 SELECT (intExp2(56) - 10 + number) AS x, x, x, x, x, x, x, x FROM numbers(20);
INSERT INTO test.t64 SELECT (intExp2(56) - 64 + number) AS x, x, x, x, x, x, x, x FROM numbers(2048);
INSERT INTO test.t64 SELECT (intExp2(56) - 64 + number) AS x, x, x, x, x, x, x, x FROM numbers(2049);
INSERT INTO test.t64 SELECT (intExp2(56) - 1 + number) AS x, x, x, x, x, x, x, x FROM numbers(2049);
SELECT * FROM test.t64 WHERE u8 != t_u8;
SELECT * FROM test.t64 WHERE u16 != t_u16;
SELECT * FROM test.t64 WHERE u32 != t_u32;
SELECT * FROM test.t64 WHERE u64 != t_u64;
INSERT INTO test.t64 SELECT (intExp2(63) + number * intExp2(62)) AS x, x, x, x, x, x, x, x FROM numbers(10);
SELECT * FROM test.t64 WHERE u8 != t_u8;
SELECT * FROM test.t64 WHERE u16 != t_u16;
SELECT * FROM test.t64 WHERE u32 != t_u32;
SELECT * FROM test.t64 WHERE u64 != t_u64;
OPTIMIZE TABLE test.t64 FINAL;
SELECT * FROM test.t64 WHERE u8 != t_u8;
SELECT * FROM test.t64 WHERE u16 != t_u16;
SELECT * FROM test.t64 WHERE u32 != t_u32;
SELECT * FROM test.t64 WHERE u64 != t_u64;
DROP TABLE test.t64;

View File

@ -0,0 +1,9 @@
-1 -1 -1 -1 -1 -1 -1 -1
-1 -1 -1 -1 -1 -1 -1 -1
0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0
1 1 1 1 1 1 1 1
42 42 42 42 42 42 42 42
42 42 42 42 42 42 42 42
42 42 42 42 42 42 42 42
42 42 42 42 42 42 42 42

View File

@ -0,0 +1,128 @@
DROP TABLE IF EXISTS test.t64;
CREATE TABLE test.t64
(
i8 Int8,
t_i8 Int8 Codec(T64, LZ4),
i16 Int16,
t_i16 Int16 Codec(T64, LZ4),
i32 Int32,
t_i32 Int32 Codec(T64, LZ4),
i64 Int64,
t_i64 Int64 Codec(T64, LZ4)
) ENGINE MergeTree() ORDER BY tuple();
INSERT INTO test.t64 SELECT toInt32(number)-1 AS x, x, x, x, x, x, x, x FROM numbers(2);
INSERT INTO test.t64 SELECT toInt32(number)-1 AS x, x, x, x, x, x, x, x FROM numbers(3);
INSERT INTO test.t64 SELECT 42 AS x, x, x, x, x, x, x, x FROM numbers(4);
SELECT * FROM test.t64 ORDER BY i64;
INSERT INTO test.t64 SELECT (intExp2(8) - 10 + number) AS x, x, x, x, x, x, x, x FROM numbers(20);
SELECT i8, t_i8 FROM test.t64 WHERE i8 != t_i8;
INSERT INTO test.t64 SELECT number AS x, x, x, x, x, x, x, x FROM numbers(intExp2(8));
INSERT INTO test.t64 SELECT number AS x, x, x, x, x, x, x, x FROM numbers(intExp2(9));
SELECT * FROM test.t64 WHERE i8 != t_i8;
SELECT * FROM test.t64 WHERE i16 != t_i16;
SELECT * FROM test.t64 WHERE i32 != t_i32;
SELECT * FROM test.t64 WHERE i64 != t_i64;
INSERT INTO test.t64 SELECT (intExp2(16) - 10 + number) AS x, x, x, x, x, x, x, x FROM numbers(10);
INSERT INTO test.t64 SELECT (intExp2(16) - 10 + number) AS x, x, x, x, x, x, x, x FROM numbers(11);
INSERT INTO test.t64 SELECT (intExp2(16) - 64 + number) AS x, x, x, x, x, x, x, x FROM numbers(64);
INSERT INTO test.t64 SELECT (intExp2(16) - 64 + number) AS x, x, x, x, x, x, x, x FROM numbers(65);
INSERT INTO test.t64 SELECT (intExp2(16) - 1 + number) AS x, x, x, x, x, x, x, x FROM numbers(65);
INSERT INTO test.t64 SELECT (10 - toInt64(intExp2(16)) + number) AS x, x, x, x, x, x, x, x FROM numbers(10);
INSERT INTO test.t64 SELECT (10 - toInt64(intExp2(16)) + number) AS x, x, x, x, x, x, x, x FROM numbers(11);
INSERT INTO test.t64 SELECT (64 - toInt64(intExp2(16)) + number) AS x, x, x, x, x, x, x, x FROM numbers(64);
INSERT INTO test.t64 SELECT (64 - toInt64(intExp2(16)) + number) AS x, x, x, x, x, x, x, x FROM numbers(65);
INSERT INTO test.t64 SELECT (1 - toInt64(intExp2(16)) + number) AS x, x, x, x, x, x, x, x FROM numbers(65);
SELECT * FROM test.t64 WHERE i8 != t_i8;
SELECT * FROM test.t64 WHERE i16 != t_i16;
SELECT * FROM test.t64 WHERE i32 != t_i32;
SELECT * FROM test.t64 WHERE i64 != t_i64;
INSERT INTO test.t64 SELECT (intExp2(24) - 10 + number) AS x, x, x, x, x, x, x, x FROM numbers(10);
INSERT INTO test.t64 SELECT (intExp2(24) - 10 + number) AS x, x, x, x, x, x, x, x FROM numbers(11);
INSERT INTO test.t64 SELECT (intExp2(24) - 64 + number) AS x, x, x, x, x, x, x, x FROM numbers(128);
INSERT INTO test.t64 SELECT (intExp2(24) - 64 + number) AS x, x, x, x, x, x, x, x FROM numbers(129);
INSERT INTO test.t64 SELECT (intExp2(24) - 1 + number) AS x, x, x, x, x, x, x, x FROM numbers(129);
INSERT INTO test.t64 SELECT (10 - toInt64(intExp2(24)) + number) AS x, x, x, x, x, x, x, x FROM numbers(10);
INSERT INTO test.t64 SELECT (10 - toInt64(intExp2(24)) + number) AS x, x, x, x, x, x, x, x FROM numbers(11);
INSERT INTO test.t64 SELECT (64 - toInt64(intExp2(24)) + number) AS x, x, x, x, x, x, x, x FROM numbers(128);
INSERT INTO test.t64 SELECT (64 - toInt64(intExp2(24)) + number) AS x, x, x, x, x, x, x, x FROM numbers(129);
INSERT INTO test.t64 SELECT (1 - toInt64(intExp2(24)) + number) AS x, x, x, x, x, x, x, x FROM numbers(129);
SELECT * FROM test.t64 WHERE i8 != t_i8;
SELECT * FROM test.t64 WHERE i16 != t_i16;
SELECT * FROM test.t64 WHERE i32 != t_i32;
SELECT * FROM test.t64 WHERE i64 != t_i64;
INSERT INTO test.t64 SELECT (intExp2(32) - 2 + number) AS x, x, x, x, x, x, x, x FROM numbers(2);
INSERT INTO test.t64 SELECT (intExp2(32) - 2 + number) AS x, x, x, x, x, x, x, x FROM numbers(3);
INSERT INTO test.t64 SELECT (intExp2(32) - 64 + number) AS x, x, x, x, x, x, x, x FROM numbers(64);
INSERT INTO test.t64 SELECT (intExp2(32) - 64 + number) AS x, x, x, x, x, x, x, x FROM numbers(65);
INSERT INTO test.t64 SELECT (intExp2(32) - 1 + number) AS x, x, x, x, x, x, x, x FROM numbers(65);
INSERT INTO test.t64 SELECT (10 - toInt64(intExp2(32)) + number) AS x, x, x, x, x, x, x, x FROM numbers(10);
INSERT INTO test.t64 SELECT (10 - toInt64(intExp2(32)) + number) AS x, x, x, x, x, x, x, x FROM numbers(11);
INSERT INTO test.t64 SELECT (64 - toInt64(intExp2(32)) + number) AS x, x, x, x, x, x, x, x FROM numbers(64);
INSERT INTO test.t64 SELECT (64 - toInt64(intExp2(32)) + number) AS x, x, x, x, x, x, x, x FROM numbers(65);
INSERT INTO test.t64 SELECT (1 - toInt64(intExp2(32)) + number) AS x, x, x, x, x, x, x, x FROM numbers(65);
SELECT * FROM test.t64 WHERE i8 != t_i8;
SELECT * FROM test.t64 WHERE i16 != t_i16;
SELECT * FROM test.t64 WHERE i32 != t_i32;
SELECT * FROM test.t64 WHERE i64 != t_i64;
INSERT INTO test.t64 SELECT (intExp2(40) - 10 + number) AS x, x, x, x, x, x, x, x FROM numbers(10);
INSERT INTO test.t64 SELECT (intExp2(40) - 10 + number) AS x, x, x, x, x, x, x, x FROM numbers(20);
INSERT INTO test.t64 SELECT (intExp2(40) - 64 + number) AS x, x, x, x, x, x, x, x FROM numbers(512);
INSERT INTO test.t64 SELECT (intExp2(40) - 64 + number) AS x, x, x, x, x, x, x, x FROM numbers(513);
INSERT INTO test.t64 SELECT (intExp2(40) - 1 + number) AS x, x, x, x, x, x, x, x FROM numbers(513);
INSERT INTO test.t64 SELECT (10 - toInt64(intExp2(40)) + number) AS x, x, x, x, x, x, x, x FROM numbers(10);
INSERT INTO test.t64 SELECT (10 - toInt64(intExp2(40)) + number) AS x, x, x, x, x, x, x, x FROM numbers(20);
INSERT INTO test.t64 SELECT (64 - toInt64(intExp2(40)) + number) AS x, x, x, x, x, x, x, x FROM numbers(512);
INSERT INTO test.t64 SELECT (64 - toInt64(intExp2(40)) + number) AS x, x, x, x, x, x, x, x FROM numbers(513);
INSERT INTO test.t64 SELECT (1 - toInt64(intExp2(40)) + number) AS x, x, x, x, x, x, x, x FROM numbers(513);
SELECT * FROM test.t64 WHERE i8 != t_i8;
SELECT * FROM test.t64 WHERE i16 != t_i16;
SELECT * FROM test.t64 WHERE i32 != t_i32;
SELECT * FROM test.t64 WHERE i64 != t_i64;
INSERT INTO test.t64 SELECT (intExp2(48) - 10 + number) AS x, x, x, x, x, x, x, x FROM numbers(10);
INSERT INTO test.t64 SELECT (intExp2(48) - 10 + number) AS x, x, x, x, x, x, x, x FROM numbers(20);
INSERT INTO test.t64 SELECT (intExp2(48) - 64 + number) AS x, x, x, x, x, x, x, x FROM numbers(1024);
INSERT INTO test.t64 SELECT (intExp2(48) - 64 + number) AS x, x, x, x, x, x, x, x FROM numbers(1025);
INSERT INTO test.t64 SELECT (intExp2(48) - 1 + number) AS x, x, x, x, x, x, x, x FROM numbers(1025);
INSERT INTO test.t64 SELECT (10 - toInt64(intExp2(48)) + number) AS x, x, x, x, x, x, x, x FROM numbers(10);
INSERT INTO test.t64 SELECT (10 - toInt64(intExp2(48)) + number) AS x, x, x, x, x, x, x, x FROM numbers(20);
INSERT INTO test.t64 SELECT (64 - toInt64(intExp2(48)) + number) AS x, x, x, x, x, x, x, x FROM numbers(1024);
INSERT INTO test.t64 SELECT (64 - toInt64(intExp2(48)) + number) AS x, x, x, x, x, x, x, x FROM numbers(1025);
INSERT INTO test.t64 SELECT (1 - toInt64(intExp2(48)) + number) AS x, x, x, x, x, x, x, x FROM numbers(1025);
SELECT * FROM test.t64 WHERE i8 != t_i8;
SELECT * FROM test.t64 WHERE i16 != t_i16;
SELECT * FROM test.t64 WHERE i32 != t_i32;
SELECT * FROM test.t64 WHERE i64 != t_i64;
INSERT INTO test.t64 SELECT (intExp2(56) - 10 + number) AS x, x, x, x, x, x, x, x FROM numbers(10);
INSERT INTO test.t64 SELECT (intExp2(56) - 10 + number) AS x, x, x, x, x, x, x, x FROM numbers(20);
INSERT INTO test.t64 SELECT (intExp2(56) - 64 + number) AS x, x, x, x, x, x, x, x FROM numbers(2048);
INSERT INTO test.t64 SELECT (intExp2(56) - 64 + number) AS x, x, x, x, x, x, x, x FROM numbers(2049);
INSERT INTO test.t64 SELECT (intExp2(56) - 1 + number) AS x, x, x, x, x, x, x, x FROM numbers(2049);
INSERT INTO test.t64 SELECT (10 - toInt64(intExp2(56)) + number) AS x, x, x, x, x, x, x, x FROM numbers(10);
INSERT INTO test.t64 SELECT (10 - toInt64(intExp2(56)) + number) AS x, x, x, x, x, x, x, x FROM numbers(20);
INSERT INTO test.t64 SELECT (64 - toInt64(intExp2(56)) + number) AS x, x, x, x, x, x, x, x FROM numbers(2048);
INSERT INTO test.t64 SELECT (64 - toInt64(intExp2(56)) + number) AS x, x, x, x, x, x, x, x FROM numbers(2049);
INSERT INTO test.t64 SELECT (1 - toInt64(intExp2(56)) + number) AS x, x, x, x, x, x, x, x FROM numbers(2049);
SELECT * FROM test.t64 WHERE i8 != t_i8;
SELECT * FROM test.t64 WHERE i16 != t_i16;
SELECT * FROM test.t64 WHERE i32 != t_i32;
SELECT * FROM test.t64 WHERE i64 != t_i64;
OPTIMIZE TABLE test.t64 FINAL;
SELECT * FROM test.t64 WHERE i8 != t_i8;
SELECT * FROM test.t64 WHERE i16 != t_i16;
SELECT * FROM test.t64 WHERE i32 != t_i32;
SELECT * FROM test.t64 WHERE i64 != t_i64;
DROP TABLE test.t64;

View File

@ -263,13 +263,14 @@ Default value: 1.
Sets default strictness for [JOIN clauses](../../query_language/select.md#select-join).
**Possible values**
Possible values:
- `ALL` — If the right table has several matching rows, the data is multiplied by the number of these rows. This is the normal `JOIN` behavior from standard SQL.
- `ALL` — If the right table has several matching rows, ClickHouse creates a [Cartesian product](https://en.wikipedia.org/wiki/Cartesian_product) from matching rows. This is the normal `JOIN` behavior from standard SQL.
- `ANY` — If the right table has several matching rows, only the first one found is joined. If the right table has only one matching row, the results of `ANY` and `ALL` are the same.
- `ASOF` — For joining sequences with an uncertain match.
- `Empty string` — If `ALL` or `ANY` is not specified in the query, ClickHouse throws an exception.
**Default value**: `ALL`
Default value: `ALL`.
## join_any_take_last_row {#settings-join_any_take_last_row}
@ -295,12 +296,24 @@ Default value: 0.
Sets the type of [JOIN](../../query_language/select.md) behavior. When merging tables, empty cells may appear. ClickHouse fills them differently based on this setting.
**Possible values**
Possible values:
- 0 — The empty cells are filled with the default value of the corresponding field type.
- 1 — `JOIN` behaves the same way as in standard SQL. The type of the corresponding field is converted to [Nullable](../../data_types/nullable.md#data_type-nullable), and empty cells are filled with [NULL](../../query_language/syntax.md).
**Default value**: 0.
Default value: 0.
## join_any_take_last_row {#settings-join_any_take_last_row}
Changes the behavior of `ANY JOIN`. When disabled, `ANY JOIN` takes the first row found for a key. When enabled, `ANY JOIN` takes the last matched row, if there are multiple rows for the same key. The setting is used only in [Join table engine](../table_engines/join.md).
Possible values:
- 0 — Disabled.
- 1 — Enabled.
Default value: 1.
## max_block_size

View File

@ -26,11 +26,12 @@ CREATE TABLE join_any_left_null ( ... ) ENGINE = Join(ANY, LEFT, ...) SETTINGS j
```
The following setting are supported by JOIN engine:
* `join_use_nulls`
* `max_rows_in_join`
* `max_bytes_in_join`
* `join_overflow_mode`
* `join_any_take_last_row`
- [join_use_nulls](../settings/settings.md#settings-join_use_nulls)
- `max_rows_in_join`
- `max_bytes_in_join`
- [join_overflow_mode](../settings/settings.md#settings-join_overflow_mode)
- [join_any_take_last_row](../settings/settings.md#settings-join_any_take_last_row)
[Original article](https://clickhouse.yandex/docs/en/operations/table_engines/join/) <!--hide-->

View File

@ -469,10 +469,43 @@ Don't mix these syntaxes.
ClickHouse doesn't directly support syntax with commas, so we don't recommend using them. The algorithm tries to rewrite the query in terms of `CROSS JOIN` and `INNER JOIN` clauses and then proceeds to query processing. When rewriting the query, ClickHouse tries to optimize performance and memory consumption. By default, ClickHouse treats commas as an `INNER JOIN` clause and converts `INNER JOIN` to `CROSS JOIN` when the algorithm cannot guarantee that `INNER JOIN` returns the required data.
#### ANY or ALL Strictness
#### Strictness
If `ALL` is specified and the right table has several matching rows, the data will be multiplied by the number of these rows. This is the normal `JOIN` behavior for standard SQL.
If `ANY` is specified and the right table has several matching rows, only the first one found is joined. If the right table has only one matching row, the results of `ANY` and `ALL` are the same.
- `ALL` — If the right table has several matching rows, ClickHouse creates a [Cartesian product](https://en.wikipedia.org/wiki/Cartesian_product) from matching rows. This is the normal `JOIN` behavior for standard SQL.
- `ANY` — If the right table has several matching rows, only the first one found is joined. If the right table has only one matching row, the results of queries with `ANY` and `ALL` keywords are the same.
- `ASOF` — For joining sequences with a non-exact match. Usage of `ASOF JOIN` is described below.
**ASOF JOIN Usage**
`ASOF JOIN` is useful when you need to join records that have no exact match. For example, consider the following tables:
```
table_1 table_2
event | ev_time | user_id event | ev_time | user_id
----------|---------|---------- ----------|---------|----------
... ...
event_1_1 | 12:00 | 42 event_2_1 | 11:59 | 42
... event_2_2 | 12:30 | 42
event_1_2 | 13:00 | 42 event_2_3 | 13:00 | 42
... ...
```
`ASOF JOIN` takes the timestamp of a user event from `table_1` and finds in `table_2` an event, which timestamp is closest (equal or less) to the timestamp of the event from `table_1`. In our example, `event_1_1` can be joined with the `event_2_1`, `event_1_2` can be joined with `event_2_3`, `event_2_2` cannot be joined.
Tables for `ASOF JOIN` must have the ordered sequence column. This column cannot be alone in a table. You can use `UInt32`, `UInt64`, `Float32`, `Float64`, `Date` and `DateTime` data types for this column.
Use the following syntax for `ASOF JOIN`:
```
SELECT expression_list FROM table_1 ASOF JOIN table_2 USING(equi_column1, ... equi_columnN, asof_column)
```
`ASOF JOIN` uses `equi_columnX` for joining on equality (`user_id` in our example) and `asof_column` for joining on the closest match.
Implementation details:
- The `asof_column` should be the last in the `USING` clause.
- The `ASOF` join is not supported in the [Join](../operations/table_engines/join.md) table engine.
To set the default strictness value, use the session configuration parameter [join_default_strictness](../operations/settings/settings.md#settings-join_default_strictness).

View File

@ -94,7 +94,7 @@
</div>
<div id="announcement" class="colored-block">
<div class="page">
Upcoming ClickHouse Meetup: <a class="announcement-link" href="https://www.huodongxing.com/event/3483759917300" rel="external nofollow" target="_blank">Shenzhen</a> on October 20
Upcoming Meetups: <a class="announcement-link" href="https://events.yandex.ru/events/ClickHouse/26-June-2019/" rel="external nofollow" target="_blank">Novosibirsk</a> on June 26 and <a class="announcement-link" href="https://www.huodongxing.com/event/3483759917300" rel="external nofollow" target="_blank">Shenzhen</a> on October 20
</div>
</div>
<div class="page">