From 5b3ab488610b19f829507f04c7b74b0fec6d52bc Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 11 Dec 2020 01:05:02 +0300 Subject: [PATCH 1/3] More forward declaration for generic headers The following headers are pretty generic, so use forward declaration as much as possible: - Context.h - Settings.h - ConnectionTimeouts.h (Also this shows that some missing some includes -- this has been fixed) And split ConnectionTimeouts.h into ConnectionTimeoutsContext.h (since module part cannot be added for it, due to recursive build dependencies that will be introduced) Also remove Settings from the RemoteBlockInputStream/RemoteQueryExecutor and just pass the context, since settings was passed only in speicifc places, that can allow making a copy of Context (i.e. Copier). Approx results (How much units will be recompiled after changing file X?): - ConnectionTimeouts.h - mainline: 100 - Context.h: - mainline: ~800 - patched: 415 - Settings.h: - mainline: 900-1K - patched: 440 (most of them because of the Context.h) --- programs/benchmark/Benchmark.cpp | 4 ++- programs/copier/ClusterCopier.cpp | 15 +++++++--- programs/copier/ClusterCopier.h | 2 +- programs/copier/ClusterCopierApp.cpp | 1 + programs/copier/TaskCluster.h | 5 +++- src/Client/Connection.cpp | 1 + src/Client/Connection.h | 3 +- src/Client/ConnectionPool.h | 2 +- src/Client/ConnectionPoolWithFailover.cpp | 1 + src/Common/XDBCBridgeHelper.h | 1 + src/DataStreams/RemoteBlockInputStream.cpp | 12 ++++---- src/DataStreams/RemoteBlockInputStream.h | 12 ++++---- src/DataStreams/RemoteQueryExecutor.cpp | 17 ++++------- src/DataStreams/RemoteQueryExecutor.h | 16 +++++----- src/Dictionaries/HTTPDictionarySource.cpp | 1 + src/Dictionaries/XDBCDictionarySource.cpp | 1 + src/Disks/S3/DiskS3.cpp | 2 ++ src/Formats/FormatFactory.h | 1 - src/Functions/FunctionFactory.h | 1 - src/Functions/tcpPort.cpp | 1 + src/IO/ConnectionTimeouts.h | 27 ++++------------- src/IO/ConnectionTimeoutsContext.h | 30 +++++++++++++++++++ src/IO/S3Common.cpp | 1 + src/Interpreters/Cluster.cpp | 1 + src/Interpreters/Cluster.h | 12 +++++++- .../ClusterProxy/SelectStreamFactory.cpp | 5 ++-- src/Interpreters/DDLWorker.h | 1 + src/Interpreters/ExpressionActions.h | 1 - src/Interpreters/ExpressionAnalyzer.cpp | 5 ++++ src/Interpreters/ExpressionAnalyzer.h | 7 ++--- src/Interpreters/InterpreterInsertQuery.cpp | 1 + .../tests/gtest_cycle_aliases.cpp | 1 + src/Parsers/IParser.h | 1 - src/Parsers/ParserSetQuery.cpp | 1 + src/Parsers/ParserSetQuery.h | 2 ++ src/Server/HTTPHandler.cpp | 1 + src/Server/InterserverIOHTTPHandler.cpp | 1 + src/Server/StaticRequestHandler.cpp | 1 + src/Storages/Distributed/DirectoryMonitor.cpp | 1 + .../DistributedBlockOutputStream.cpp | 1 + src/Storages/MergeTree/MergeTreeSettings.cpp | 1 + src/Storages/RabbitMQ/StorageRabbitMQ.cpp | 1 + src/Storages/RabbitMQ/StorageRabbitMQ.h | 3 +- src/Storages/StorageReplicatedMergeTree.cpp | 1 + src/Storages/StorageURL.cpp | 2 ++ src/Storages/StorageURL.h | 5 +++- src/Storages/StorageXDBC.cpp | 1 + .../System/StorageSystemDistributionQueue.cpp | 1 + src/TableFunctions/ITableFunctionXDBC.cpp | 1 + 49 files changed, 139 insertions(+), 77 deletions(-) create mode 100644 src/IO/ConnectionTimeoutsContext.h diff --git a/programs/benchmark/Benchmark.cpp b/programs/benchmark/Benchmark.cpp index 8c69a545017..ae1d16ce402 100644 --- a/programs/benchmark/Benchmark.cpp +++ b/programs/benchmark/Benchmark.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -95,6 +96,7 @@ public: } global_context.makeGlobalContext(); + global_context.setSettings(settings); std::cerr << std::fixed << std::setprecision(3); @@ -404,7 +406,7 @@ private: Stopwatch watch; RemoteBlockInputStream stream( *(*connection_entries[connection_index]), - query, {}, global_context, &settings, nullptr, Scalars(), Tables(), query_processing_stage); + query, {}, global_context, nullptr, Scalars(), Tables(), query_processing_stage); if (!query_id.empty()) stream.setQueryId(query_id); diff --git a/programs/copier/ClusterCopier.cpp b/programs/copier/ClusterCopier.cpp index 2f19fc47fd2..ca09e7c1889 100644 --- a/programs/copier/ClusterCopier.cpp +++ b/programs/copier/ClusterCopier.cpp @@ -5,6 +5,7 @@ #include #include #include +#include namespace DB @@ -1588,11 +1589,14 @@ void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskT LOG_DEBUG(log, "All helping tables dropped partition {}", partition_name); } -String ClusterCopier::getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings * settings) +String ClusterCopier::getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings & settings) { + Context remote_context(context); + remote_context.setSettings(settings); + String query = "SHOW CREATE TABLE " + getQuotedTable(table); Block block = getBlockWithAllStreamData(std::make_shared( - connection, query, InterpreterShowCreateQuery::getSampleBlock(), context, settings)); + connection, query, InterpreterShowCreateQuery::getSampleBlock(), remote_context)); return typeid_cast(*block.safeGetByPosition(0).column).getDataAt(0).toString(); } @@ -1604,7 +1608,7 @@ ASTPtr ClusterCopier::getCreateTableForPullShard(const ConnectionTimeouts & time String create_query_pull_str = getRemoteCreateTable( task_shard.task_table.table_pull, *connection_entry, - &task_cluster->settings_pull); + task_cluster->settings_pull); ParserCreateQuery parser_create_query; const auto & settings = context.getSettingsRef(); @@ -1856,6 +1860,9 @@ UInt64 ClusterCopier::executeQueryOnCluster( auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(shard_settings).getSaturated(shard_settings.max_execution_time); auto connections = shard.pool->getMany(timeouts, &shard_settings, pool_mode); + Context shard_context(context); + shard_context.setSettings(shard_settings); + for (auto & connection : connections) { if (connection.isNull()) @@ -1864,7 +1871,7 @@ UInt64 ClusterCopier::executeQueryOnCluster( try { /// CREATE TABLE and DROP PARTITION queries return empty block - RemoteBlockInputStream stream{*connection, query, Block{}, context, &shard_settings}; + RemoteBlockInputStream stream{*connection, query, Block{}, shard_context}; NullBlockOutputStream output{Block{}}; copyData(stream, output); diff --git a/programs/copier/ClusterCopier.h b/programs/copier/ClusterCopier.h index beaf247dfc8..9aff5493cf8 100644 --- a/programs/copier/ClusterCopier.h +++ b/programs/copier/ClusterCopier.h @@ -154,7 +154,7 @@ protected: /// table we can get rid of partition pieces (partitions in helping tables). void dropParticularPartitionPieceFromAllHelpingTables(const TaskTable & task_table, const String & partition_name); - String getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings * settings = nullptr); + String getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings & settings); ASTPtr getCreateTableForPullShard(const ConnectionTimeouts & timeouts, TaskShard & task_shard); diff --git a/programs/copier/ClusterCopierApp.cpp b/programs/copier/ClusterCopierApp.cpp index c2946e12c34..e3169a49ecf 100644 --- a/programs/copier/ClusterCopierApp.cpp +++ b/programs/copier/ClusterCopierApp.cpp @@ -1,6 +1,7 @@ #include "ClusterCopierApp.h" #include #include +#include #include #include diff --git a/programs/copier/TaskCluster.h b/programs/copier/TaskCluster.h index 68d98c648f5..5b28f461dd8 100644 --- a/programs/copier/TaskCluster.h +++ b/programs/copier/TaskCluster.h @@ -1,6 +1,7 @@ #pragma once #include "Aliases.h" +#include namespace DB { @@ -12,7 +13,9 @@ namespace ErrorCodes struct TaskCluster { TaskCluster(const String & task_zookeeper_path_, const String & default_local_database_) - : task_zookeeper_path(task_zookeeper_path_), default_local_database(default_local_database_) {} + : task_zookeeper_path(task_zookeeper_path_) + , default_local_database(default_local_database_) + {} void loadTasks(const Poco::Util::AbstractConfiguration & config, const String & base_key = ""); diff --git a/src/Client/Connection.cpp b/src/Client/Connection.cpp index f7119195e97..8f4a64766cd 100644 --- a/src/Client/Connection.cpp +++ b/src/Client/Connection.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include diff --git a/src/Client/Connection.h b/src/Client/Connection.h index f4c25001f3e..30a74ec73aa 100644 --- a/src/Client/Connection.h +++ b/src/Client/Connection.h @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -17,7 +18,6 @@ #include -#include #include #include @@ -31,6 +31,7 @@ namespace DB class ClientInfo; class Pipe; +struct Settings; /// Struct which represents data we are going to send for external table. struct ExternalTableData diff --git a/src/Client/ConnectionPool.h b/src/Client/ConnectionPool.h index 736075a4cc1..2389cc6755d 100644 --- a/src/Client/ConnectionPool.h +++ b/src/Client/ConnectionPool.h @@ -1,9 +1,9 @@ #pragma once #include - #include #include +#include namespace DB { diff --git a/src/Client/ConnectionPoolWithFailover.cpp b/src/Client/ConnectionPoolWithFailover.cpp index 68f4bcd1b76..1ca61dc8059 100644 --- a/src/Client/ConnectionPoolWithFailover.cpp +++ b/src/Client/ConnectionPoolWithFailover.cpp @@ -4,6 +4,7 @@ #include #include +#include #include #include #include diff --git a/src/Common/XDBCBridgeHelper.h b/src/Common/XDBCBridgeHelper.h index ed1f63a2507..d7d3a6ba4cc 100644 --- a/src/Common/XDBCBridgeHelper.h +++ b/src/Common/XDBCBridgeHelper.h @@ -12,6 +12,7 @@ #include #include #include +#include #include #include diff --git a/src/DataStreams/RemoteBlockInputStream.cpp b/src/DataStreams/RemoteBlockInputStream.cpp index c7c5ce2d00a..a62f7fca0b7 100644 --- a/src/DataStreams/RemoteBlockInputStream.cpp +++ b/src/DataStreams/RemoteBlockInputStream.cpp @@ -6,27 +6,27 @@ namespace DB RemoteBlockInputStream::RemoteBlockInputStream( Connection & connection, - const String & query_, const Block & header_, const Context & context_, const Settings * settings, + const String & query_, const Block & header_, const Context & context_, const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_) - : query_executor(connection, query_, header_, context_, settings, throttler, scalars_, external_tables_, stage_) + : query_executor(connection, query_, header_, context_, throttler, scalars_, external_tables_, stage_) { init(); } RemoteBlockInputStream::RemoteBlockInputStream( std::vector && connections, - const String & query_, const Block & header_, const Context & context_, const Settings * settings, + const String & query_, const Block & header_, const Context & context_, const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_) - : query_executor(std::move(connections), query_, header_, context_, settings, throttler, scalars_, external_tables_, stage_) + : query_executor(std::move(connections), query_, header_, context_, throttler, scalars_, external_tables_, stage_) { init(); } RemoteBlockInputStream::RemoteBlockInputStream( const ConnectionPoolWithFailoverPtr & pool, - const String & query_, const Block & header_, const Context & context_, const Settings * settings, + const String & query_, const Block & header_, const Context & context_, const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_) - : query_executor(pool, query_, header_, context_, settings, throttler, scalars_, external_tables_, stage_) + : query_executor(pool, query_, header_, context_, throttler, scalars_, external_tables_, stage_) { init(); } diff --git a/src/DataStreams/RemoteBlockInputStream.h b/src/DataStreams/RemoteBlockInputStream.h index 628feb0ab80..5ef05ee99eb 100644 --- a/src/DataStreams/RemoteBlockInputStream.h +++ b/src/DataStreams/RemoteBlockInputStream.h @@ -6,7 +6,6 @@ #include #include -#include #include #include #include @@ -16,32 +15,31 @@ namespace DB { +class Context; + /** This class allows one to launch queries on remote replicas of one shard and get results */ class RemoteBlockInputStream : public IBlockInputStream { public: /// Takes already set connection. - /// If `settings` is nullptr, settings will be taken from context. RemoteBlockInputStream( Connection & connection, - const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr, + const String & query_, const Block & header_, const Context & context_, const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete); /// Accepts several connections already taken from pool. - /// If `settings` is nullptr, settings will be taken from context. RemoteBlockInputStream( std::vector && connections, - const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr, + const String & query_, const Block & header_, const Context & context_, const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete); /// Takes a pool and gets one or several connections from it. - /// If `settings` is nullptr, settings will be taken from context. RemoteBlockInputStream( const ConnectionPoolWithFailoverPtr & pool, - const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr, + const String & query_, const Block & header_, const Context & context_, const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete); diff --git a/src/DataStreams/RemoteQueryExecutor.cpp b/src/DataStreams/RemoteQueryExecutor.cpp index 9abce0edef1..62ce1632e83 100644 --- a/src/DataStreams/RemoteQueryExecutor.cpp +++ b/src/DataStreams/RemoteQueryExecutor.cpp @@ -8,7 +8,9 @@ #include #include #include +#include #include +#include namespace DB { @@ -20,14 +22,11 @@ namespace ErrorCodes RemoteQueryExecutor::RemoteQueryExecutor( Connection & connection, - const String & query_, const Block & header_, const Context & context_, const Settings * settings, + const String & query_, const Block & header_, const Context & context_, ThrottlerPtr throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_) : header(header_), query(query_), context(context_) , scalars(scalars_), external_tables(external_tables_), stage(stage_) { - if (settings) - context.setSettings(*settings); - create_multiplexed_connections = [this, &connection, throttler]() { return std::make_unique(connection, context.getSettingsRef(), throttler); @@ -36,14 +35,11 @@ RemoteQueryExecutor::RemoteQueryExecutor( RemoteQueryExecutor::RemoteQueryExecutor( std::vector && connections, - const String & query_, const Block & header_, const Context & context_, const Settings * settings, + const String & query_, const Block & header_, const Context & context_, const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_) : header(header_), query(query_), context(context_) , scalars(scalars_), external_tables(external_tables_), stage(stage_) { - if (settings) - context.setSettings(*settings); - create_multiplexed_connections = [this, connections, throttler]() mutable { return std::make_unique( @@ -53,14 +49,11 @@ RemoteQueryExecutor::RemoteQueryExecutor( RemoteQueryExecutor::RemoteQueryExecutor( const ConnectionPoolWithFailoverPtr & pool, - const String & query_, const Block & header_, const Context & context_, const Settings * settings, + const String & query_, const Block & header_, const Context & context_, const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_) : header(header_), query(query_), context(context_) , scalars(scalars_), external_tables(external_tables_), stage(stage_) { - if (settings) - context.setSettings(*settings); - create_multiplexed_connections = [this, pool, throttler]() { const Settings & current_settings = context.getSettingsRef(); diff --git a/src/DataStreams/RemoteQueryExecutor.h b/src/DataStreams/RemoteQueryExecutor.h index 0db0e0218be..eb03472504d 100644 --- a/src/DataStreams/RemoteQueryExecutor.h +++ b/src/DataStreams/RemoteQueryExecutor.h @@ -1,12 +1,15 @@ #pragma once -#include #include #include +#include +#include namespace DB { +class Context; + class Throttler; using ThrottlerPtr = std::shared_ptr; @@ -21,26 +24,23 @@ class RemoteQueryExecutor { public: /// Takes already set connection. - /// If `settings` is nullptr, settings will be taken from context. RemoteQueryExecutor( Connection & connection, - const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr, + const String & query_, const Block & header_, const Context & context_, ThrottlerPtr throttler_ = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete); /// Accepts several connections already taken from pool. - /// If `settings` is nullptr, settings will be taken from context. RemoteQueryExecutor( std::vector && connections, - const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr, + const String & query_, const Block & header_, const Context & context_, const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete); /// Takes a pool and gets one or several connections from it. - /// If `settings` is nullptr, settings will be taken from context. RemoteQueryExecutor( const ConnectionPoolWithFailoverPtr & pool, - const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr, + const String & query_, const Block & header_, const Context & context_, const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete); @@ -93,7 +93,7 @@ private: const String query; String query_id = ""; - Context context; + const Context & context; ProgressCallback progress_callback; ProfileInfoCallback profile_info_callback; diff --git a/src/Dictionaries/HTTPDictionarySource.cpp b/src/Dictionaries/HTTPDictionarySource.cpp index 18a97f34486..67bd8462036 100644 --- a/src/Dictionaries/HTTPDictionarySource.cpp +++ b/src/Dictionaries/HTTPDictionarySource.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include diff --git a/src/Dictionaries/XDBCDictionarySource.cpp b/src/Dictionaries/XDBCDictionarySource.cpp index 832c30ed4b7..89df4b606fe 100644 --- a/src/Dictionaries/XDBCDictionarySource.cpp +++ b/src/Dictionaries/XDBCDictionarySource.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include diff --git a/src/Disks/S3/DiskS3.cpp b/src/Disks/S3/DiskS3.cpp index 6090f00b4e2..abe4bf610ad 100644 --- a/src/Disks/S3/DiskS3.cpp +++ b/src/Disks/S3/DiskS3.cpp @@ -17,6 +17,8 @@ #include #include #include +#include +#include #include #include diff --git a/src/Formats/FormatFactory.h b/src/Formats/FormatFactory.h index 3b0811d579a..0fe6f19f0b7 100644 --- a/src/Formats/FormatFactory.h +++ b/src/Formats/FormatFactory.h @@ -16,7 +16,6 @@ namespace DB class Block; class Context; -struct FormatSettings; struct Settings; struct FormatFactorySettings; diff --git a/src/Functions/FunctionFactory.h b/src/Functions/FunctionFactory.h index 7872c192b41..7990e78daf8 100644 --- a/src/Functions/FunctionFactory.h +++ b/src/Functions/FunctionFactory.h @@ -2,7 +2,6 @@ #include #include -#include #include #include diff --git a/src/Functions/tcpPort.cpp b/src/Functions/tcpPort.cpp index 52acf0ade54..26991c900ab 100644 --- a/src/Functions/tcpPort.cpp +++ b/src/Functions/tcpPort.cpp @@ -1,5 +1,6 @@ #include #include +#include namespace DB diff --git a/src/IO/ConnectionTimeouts.h b/src/IO/ConnectionTimeouts.h index 9e87dee4fc3..e5efabee6e2 100644 --- a/src/IO/ConnectionTimeouts.h +++ b/src/IO/ConnectionTimeouts.h @@ -1,14 +1,13 @@ #pragma once #include -#include - -#include -#include namespace DB { +class Context; +struct Settings; + struct ConnectionTimeouts { Poco::Timespan connection_timeout; @@ -92,24 +91,10 @@ struct ConnectionTimeouts } /// Timeouts for the case when we have just single attempt to connect. - static ConnectionTimeouts getTCPTimeoutsWithoutFailover(const Settings & settings) - { - return ConnectionTimeouts(settings.connect_timeout, settings.send_timeout, settings.receive_timeout, settings.tcp_keep_alive_timeout); - } - + static ConnectionTimeouts getTCPTimeoutsWithoutFailover(const Settings & settings); /// Timeouts for the case when we will try many addresses in a loop. - static ConnectionTimeouts getTCPTimeoutsWithFailover(const Settings & settings) - { - return ConnectionTimeouts(settings.connect_timeout_with_failover_ms, settings.send_timeout, settings.receive_timeout, settings.tcp_keep_alive_timeout, 0, settings.connect_timeout_with_failover_secure_ms); - } - - static ConnectionTimeouts getHTTPTimeouts(const Context & context) - { - const auto & settings = context.getSettingsRef(); - const auto & config = context.getConfigRef(); - Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 10), 0}; - return ConnectionTimeouts(settings.http_connection_timeout, settings.http_send_timeout, settings.http_receive_timeout, settings.tcp_keep_alive_timeout, http_keep_alive_timeout); - } + static ConnectionTimeouts getTCPTimeoutsWithFailover(const Settings & settings); + static ConnectionTimeouts getHTTPTimeouts(const Context & context); }; } diff --git a/src/IO/ConnectionTimeoutsContext.h b/src/IO/ConnectionTimeoutsContext.h new file mode 100644 index 00000000000..ce19738f507 --- /dev/null +++ b/src/IO/ConnectionTimeoutsContext.h @@ -0,0 +1,30 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ + +/// Timeouts for the case when we have just single attempt to connect. +inline ConnectionTimeouts ConnectionTimeouts::getTCPTimeoutsWithoutFailover(const Settings & settings) +{ + return ConnectionTimeouts(settings.connect_timeout, settings.send_timeout, settings.receive_timeout, settings.tcp_keep_alive_timeout); +} + +/// Timeouts for the case when we will try many addresses in a loop. +inline ConnectionTimeouts ConnectionTimeouts::getTCPTimeoutsWithFailover(const Settings & settings) +{ + return ConnectionTimeouts(settings.connect_timeout_with_failover_ms, settings.send_timeout, settings.receive_timeout, settings.tcp_keep_alive_timeout, 0, settings.connect_timeout_with_failover_secure_ms); +} + +inline ConnectionTimeouts ConnectionTimeouts::getHTTPTimeouts(const Context & context) +{ + const auto & settings = context.getSettingsRef(); + const auto & config = context.getConfigRef(); + Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 10), 0}; + return ConnectionTimeouts(settings.http_connection_timeout, settings.http_send_timeout, settings.http_receive_timeout, settings.tcp_keep_alive_timeout, http_keep_alive_timeout); +} + +} diff --git a/src/IO/S3Common.cpp b/src/IO/S3Common.cpp index bc49c2641a0..aad6e984568 100644 --- a/src/IO/S3Common.cpp +++ b/src/IO/S3Common.cpp @@ -19,6 +19,7 @@ # include # include # include +# include # include namespace diff --git a/src/Interpreters/Cluster.cpp b/src/Interpreters/Cluster.cpp index 218502e7f43..c9c56c96cbe 100644 --- a/src/Interpreters/Cluster.cpp +++ b/src/Interpreters/Cluster.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include diff --git a/src/Interpreters/Cluster.h b/src/Interpreters/Cluster.h index 4b6ee35efd5..c64d52724e5 100644 --- a/src/Interpreters/Cluster.h +++ b/src/Interpreters/Cluster.h @@ -1,13 +1,23 @@ #pragma once #include -#include #include #include #include +namespace Poco +{ + namespace Util + { + class AbstractConfiguration; + } +} + namespace DB { + +struct Settings; + namespace ErrorCodes { extern const int LOGICAL_ERROR; diff --git a/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp b/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp index 56f306595ac..34065c26f3e 100644 --- a/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp +++ b/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -143,7 +144,7 @@ void SelectStreamFactory::createForShard( auto emplace_remote_stream = [&]() { auto remote_query_executor = std::make_shared( - shard_info.pool, modified_query, header, context, nullptr, throttler, scalars, external_tables, processed_stage); + shard_info.pool, modified_query, header, context, throttler, scalars, external_tables, processed_stage); remote_query_executor->setLogger(log); remote_query_executor->setPoolMode(PoolMode::GET_MANY); @@ -288,7 +289,7 @@ void SelectStreamFactory::createForShard( connections.emplace_back(std::move(try_result.entry)); auto remote_query_executor = std::make_shared( - std::move(connections), modified_query, header, context, nullptr, throttler, scalars, external_tables, stage); + std::move(connections), modified_query, header, context, throttler, scalars, external_tables, stage); return createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes); } diff --git a/src/Interpreters/DDLWorker.h b/src/Interpreters/DDLWorker.h index 7dd9c38e9da..18c28c55f1f 100644 --- a/src/Interpreters/DDLWorker.h +++ b/src/Interpreters/DDLWorker.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include diff --git a/src/Interpreters/ExpressionActions.h b/src/Interpreters/ExpressionActions.h index 2b1aa5e2456..5104f1e8e72 100644 --- a/src/Interpreters/ExpressionActions.h +++ b/src/Interpreters/ExpressionActions.h @@ -2,7 +2,6 @@ #include #include -#include #include #include diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index 72d48efdd80..1b93c090842 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -112,6 +112,11 @@ bool sanitizeBlock(Block & block, bool throw_if_cannot_create_column) return true; } +ExpressionAnalyzer::ExtractedSettings::ExtractedSettings(const Settings & settings_) + : use_index_for_in_with_subqueries(settings_.use_index_for_in_with_subqueries) + , size_limits_for_set(settings_.max_rows_in_set, settings_.max_bytes_in_set, settings_.set_overflow_mode) +{} + ExpressionAnalyzer::ExpressionAnalyzer( const ASTPtr & query_, diff --git a/src/Interpreters/ExpressionAnalyzer.h b/src/Interpreters/ExpressionAnalyzer.h index 6e87d0b7250..2567b32e37e 100644 --- a/src/Interpreters/ExpressionAnalyzer.h +++ b/src/Interpreters/ExpressionAnalyzer.h @@ -1,6 +1,5 @@ #pragma once -#include #include #include #include @@ -16,6 +15,7 @@ namespace DB class Block; class Context; +struct Settings; struct ExpressionActionsChain; class ExpressionActions; @@ -80,10 +80,7 @@ private: const bool use_index_for_in_with_subqueries; const SizeLimits size_limits_for_set; - ExtractedSettings(const Settings & settings_) - : use_index_for_in_with_subqueries(settings_.use_index_for_in_with_subqueries), - size_limits_for_set(settings_.max_rows_in_set, settings_.max_bytes_in_set, settings_.set_overflow_mode) - {} + ExtractedSettings(const Settings & settings_); }; public: diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index 39381bf0241..e3fc67b432c 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include diff --git a/src/Interpreters/tests/gtest_cycle_aliases.cpp b/src/Interpreters/tests/gtest_cycle_aliases.cpp index 593db93de3e..56e23c6a497 100644 --- a/src/Interpreters/tests/gtest_cycle_aliases.cpp +++ b/src/Interpreters/tests/gtest_cycle_aliases.cpp @@ -5,6 +5,7 @@ #include #include #include +#include using namespace DB; diff --git a/src/Parsers/IParser.h b/src/Parsers/IParser.h index 05ceb8c900b..7dc31e4c1eb 100644 --- a/src/Parsers/IParser.h +++ b/src/Parsers/IParser.h @@ -5,7 +5,6 @@ #include #include -#include #include #include #include diff --git a/src/Parsers/ParserSetQuery.cpp b/src/Parsers/ParserSetQuery.cpp index 328d01f2f81..aac3a191a10 100644 --- a/src/Parsers/ParserSetQuery.cpp +++ b/src/Parsers/ParserSetQuery.cpp @@ -6,6 +6,7 @@ #include #include +#include namespace DB diff --git a/src/Parsers/ParserSetQuery.h b/src/Parsers/ParserSetQuery.h index 59a6109ea48..0bc1cec3093 100644 --- a/src/Parsers/ParserSetQuery.h +++ b/src/Parsers/ParserSetQuery.h @@ -7,6 +7,8 @@ namespace DB { +struct SettingChange; + /** Query like this: * SET name1 = value1, name2 = value2, ... */ diff --git a/src/Server/HTTPHandler.cpp b/src/Server/HTTPHandler.cpp index ed154ba65f2..472850950d4 100644 --- a/src/Server/HTTPHandler.cpp +++ b/src/Server/HTTPHandler.cpp @@ -36,6 +36,7 @@ #include #include #include +#include #include #include diff --git a/src/Server/InterserverIOHTTPHandler.cpp b/src/Server/InterserverIOHTTPHandler.cpp index f4385e8ebc4..973759bedd1 100644 --- a/src/Server/InterserverIOHTTPHandler.cpp +++ b/src/Server/InterserverIOHTTPHandler.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include "IServer.h" namespace DB diff --git a/src/Server/StaticRequestHandler.cpp b/src/Server/StaticRequestHandler.cpp index 7f63099c972..ad2c07ab0aa 100644 --- a/src/Server/StaticRequestHandler.cpp +++ b/src/Server/StaticRequestHandler.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include diff --git a/src/Storages/Distributed/DirectoryMonitor.cpp b/src/Storages/Distributed/DirectoryMonitor.cpp index 47da0a10d9e..5d089eb9f80 100644 --- a/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/src/Storages/Distributed/DirectoryMonitor.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include diff --git a/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/src/Storages/Distributed/DistributedBlockOutputStream.cpp index 2248c489679..d24967256a0 100644 --- a/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include diff --git a/src/Storages/MergeTree/MergeTreeSettings.cpp b/src/Storages/MergeTree/MergeTreeSettings.cpp index 15ff62e0aa6..e77668e8900 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.cpp +++ b/src/Storages/MergeTree/MergeTreeSettings.cpp @@ -4,6 +4,7 @@ #include #include #include +#include namespace DB diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index 1c3b1bbd99c..f41c4805d24 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.h b/src/Storages/RabbitMQ/StorageRabbitMQ.h index f4bb3215b55..a46da6072af 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.h +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.h @@ -2,7 +2,6 @@ #include #include -#include #include #include #include @@ -19,6 +18,8 @@ namespace DB { +class Context; + using ChannelPtr = std::shared_ptr; class StorageRabbitMQ final: public ext::shared_ptr_helper, public IStorage diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index e2bf1592659..31b04664b17 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -42,6 +42,7 @@ #include #include #include +#include #include #include diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index 8dcd549f9c8..00903abee59 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -10,6 +10,8 @@ #include #include #include +#include +#include #include diff --git a/src/Storages/StorageURL.h b/src/Storages/StorageURL.h index 78d972c6e7e..21b2e3e27a1 100644 --- a/src/Storages/StorageURL.h +++ b/src/Storages/StorageURL.h @@ -4,12 +4,15 @@ #include #include #include -#include +#include #include namespace DB { + +struct ConnectionTimeouts; + /** * This class represents table engine for external urls. * It sends HTTP GET to server when select is called and diff --git a/src/Storages/StorageXDBC.cpp b/src/Storages/StorageXDBC.cpp index 3aca884d15a..f2f8cdb23f5 100644 --- a/src/Storages/StorageXDBC.cpp +++ b/src/Storages/StorageXDBC.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include diff --git a/src/Storages/System/StorageSystemDistributionQueue.cpp b/src/Storages/System/StorageSystemDistributionQueue.cpp index edba7c13b1c..db649e7e1ba 100644 --- a/src/Storages/System/StorageSystemDistributionQueue.cpp +++ b/src/Storages/System/StorageSystemDistributionQueue.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include namespace DB diff --git a/src/TableFunctions/ITableFunctionXDBC.cpp b/src/TableFunctions/ITableFunctionXDBC.cpp index 67d1257fe4c..e04a86b5abf 100644 --- a/src/TableFunctions/ITableFunctionXDBC.cpp +++ b/src/TableFunctions/ITableFunctionXDBC.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include From 77a1d00dee7157ff89a73a6a10a2cb3a0aa27de1 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 11 Dec 2020 21:10:36 +0300 Subject: [PATCH 2/3] Add new remote context as interpreter context to the Pipe --- src/DataStreams/RemoteQueryExecutor.cpp | 2 +- src/Interpreters/ClusterProxy/IStreamFactory.h | 2 +- src/Interpreters/ClusterProxy/SelectStreamFactory.cpp | 9 +++++++-- src/Interpreters/ClusterProxy/SelectStreamFactory.h | 2 +- src/Interpreters/ClusterProxy/executeQuery.cpp | 9 ++++----- src/Interpreters/ClusterProxy/executeQuery.h | 2 +- src/Storages/getStructureOfRemoteTable.cpp | 2 +- 7 files changed, 16 insertions(+), 12 deletions(-) diff --git a/src/DataStreams/RemoteQueryExecutor.cpp b/src/DataStreams/RemoteQueryExecutor.cpp index 62ce1632e83..c38f42893af 100644 --- a/src/DataStreams/RemoteQueryExecutor.cpp +++ b/src/DataStreams/RemoteQueryExecutor.cpp @@ -140,7 +140,7 @@ void RemoteQueryExecutor::sendQuery() multiplexed_connections = create_multiplexed_connections(); - const auto& settings = context.getSettingsRef(); + const auto & settings = context.getSettingsRef(); if (settings.skip_unavailable_shards && 0 == multiplexed_connections->size()) return; diff --git a/src/Interpreters/ClusterProxy/IStreamFactory.h b/src/Interpreters/ClusterProxy/IStreamFactory.h index c0b887e0489..3cf100cd85c 100644 --- a/src/Interpreters/ClusterProxy/IStreamFactory.h +++ b/src/Interpreters/ClusterProxy/IStreamFactory.h @@ -32,7 +32,7 @@ public: virtual void createForShard( const Cluster::ShardInfo & shard_info, const String & query, const ASTPtr & query_ast, - const Context & context, const ThrottlerPtr & throttler, + const std::shared_ptr & context_ptr, const ThrottlerPtr & throttler, const SelectQueryInfo & query_info, std::vector & res, Pipes & remote_pipes, diff --git a/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp b/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp index 34065c26f3e..e2a7c5b55dc 100644 --- a/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp +++ b/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp @@ -114,13 +114,15 @@ String formattedAST(const ASTPtr & ast) void SelectStreamFactory::createForShard( const Cluster::ShardInfo & shard_info, const String &, const ASTPtr & query_ast, - const Context & context, const ThrottlerPtr & throttler, + const std::shared_ptr & context_ptr, const ThrottlerPtr & throttler, const SelectQueryInfo &, std::vector & plans, Pipes & remote_pipes, Pipes & delayed_pipes, Poco::Logger * log) { + const auto & context = *context_ptr; + bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState; bool add_totals = false; bool add_extremes = false; @@ -152,6 +154,7 @@ void SelectStreamFactory::createForShard( remote_query_executor->setMainTable(main_table); remote_pipes.emplace_back(createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes)); + remote_pipes.back().addInterpreterContext(context_ptr); }; const auto & settings = context.getSettingsRef(); @@ -243,7 +246,8 @@ void SelectStreamFactory::createForShard( /// Do it lazily to avoid connecting in the main thread. auto lazily_create_stream = [ - pool = shard_info.pool, shard_num = shard_info.shard_num, modified_query, header = header, modified_query_ast, context, throttler, + pool = shard_info.pool, shard_num = shard_info.shard_num, modified_query, header = header, modified_query_ast, + &context, context_ptr, throttler, main_table = main_table, table_func_ptr = table_func_ptr, scalars = scalars, external_tables = external_tables, stage = processed_stage, local_delay, add_agg_info, add_totals, add_extremes]() -> Pipe @@ -296,6 +300,7 @@ void SelectStreamFactory::createForShard( }; delayed_pipes.emplace_back(createDelayedPipe(header, lazily_create_stream, add_totals, add_extremes)); + delayed_pipes.back().addInterpreterContext(context_ptr); } else emplace_remote_stream(); diff --git a/src/Interpreters/ClusterProxy/SelectStreamFactory.h b/src/Interpreters/ClusterProxy/SelectStreamFactory.h index b51ac109a11..596e99b8a18 100644 --- a/src/Interpreters/ClusterProxy/SelectStreamFactory.h +++ b/src/Interpreters/ClusterProxy/SelectStreamFactory.h @@ -37,7 +37,7 @@ public: void createForShard( const Cluster::ShardInfo & shard_info, const String & query, const ASTPtr & query_ast, - const Context & context, const ThrottlerPtr & throttler, + const std::shared_ptr & context_ptr, const ThrottlerPtr & throttler, const SelectQueryInfo & query_info, std::vector & plans, Pipes & remote_pipes, diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index c79b17eac2a..59cbae67770 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -19,7 +19,7 @@ namespace DB namespace ClusterProxy { -Context updateSettingsForCluster(const Cluster & cluster, const Context & context, const Settings & settings, Poco::Logger * log) +std::shared_ptr updateSettingsForCluster(const Cluster & cluster, const Context & context, const Settings & settings, Poco::Logger * log) { Settings new_settings = settings; new_settings.queue_max_wait_ms = Cluster::saturate(new_settings.queue_max_wait_ms, settings.max_execution_time); @@ -78,9 +78,8 @@ Context updateSettingsForCluster(const Cluster & cluster, const Context & contex } } - Context new_context(context); - new_context.setSettings(new_settings); - + auto new_context = std::make_shared(context); + new_context->setSettings(new_settings); return new_context; } @@ -99,7 +98,7 @@ void executeQuery( const std::string query = queryToString(query_ast); - Context new_context = updateSettingsForCluster(*query_info.cluster, context, settings, log); + auto new_context = updateSettingsForCluster(*query_info.cluster, context, settings, log); ThrottlerPtr user_level_throttler; if (auto * process_list_element = context.getProcessListElement()) diff --git a/src/Interpreters/ClusterProxy/executeQuery.h b/src/Interpreters/ClusterProxy/executeQuery.h index 0b40c1412a1..8840b82d5b2 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.h +++ b/src/Interpreters/ClusterProxy/executeQuery.h @@ -27,7 +27,7 @@ class IStreamFactory; /// - optimize_skip_unused_shards_nesting /// /// @return new Context with adjusted settings -Context updateSettingsForCluster(const Cluster & cluster, const Context & context, const Settings & settings, Poco::Logger * log = nullptr); +std::shared_ptr updateSettingsForCluster(const Cluster & cluster, const Context & context, const Settings & settings, Poco::Logger * log = nullptr); /// Execute a distributed query, creating a vector of BlockInputStreams, from which the result can be read. /// `stream_factory` object encapsulates the logic of creating streams for a different type of query diff --git a/src/Storages/getStructureOfRemoteTable.cpp b/src/Storages/getStructureOfRemoteTable.cpp index a987e3d4e8a..de5f3924ca9 100644 --- a/src/Storages/getStructureOfRemoteTable.cpp +++ b/src/Storages/getStructureOfRemoteTable.cpp @@ -71,7 +71,7 @@ ColumnsDescription getStructureOfRemoteTableInShard( }; /// Execute remote query without restrictions (because it's not real user query, but part of implementation) - auto input = std::make_shared(shard_info.pool, query, sample_block, new_context); + auto input = std::make_shared(shard_info.pool, query, sample_block, *new_context); input->setPoolMode(PoolMode::GET_ONE); if (!table_func_ptr) input->setMainTable(table_id); From 41bc43325bb7c63369ac206bd2db102b4882f640 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 11 Dec 2020 21:10:37 +0300 Subject: [PATCH 3/3] Make 01290_max_execution_speed_distributed less flaky --- .../01290_max_execution_speed_distributed.sql | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/tests/queries/0_stateless/01290_max_execution_speed_distributed.sql b/tests/queries/0_stateless/01290_max_execution_speed_distributed.sql index 8282390ca90..b0f545838e6 100644 --- a/tests/queries/0_stateless/01290_max_execution_speed_distributed.sql +++ b/tests/queries/0_stateless/01290_max_execution_speed_distributed.sql @@ -1,4 +1,8 @@ -SET max_execution_speed = 1000000, timeout_before_checking_execution_speed = 0.001, max_block_size = 100; +SET max_execution_speed = 1000000; +SET timeout_before_checking_execution_speed = 0.001; +SET max_block_size = 100; + +SET log_queries=1; CREATE TEMPORARY TABLE times (t DateTime); @@ -10,4 +14,10 @@ SELECT max(t) - min(t) >= 1 FROM times; -- Check that the query was also throttled on "remote" servers. SYSTEM FLUSH LOGS; -SELECT DISTINCT query_duration_ms >= 500 FROM system.query_log WHERE event_date >= yesterday() AND query LIKE '%special query for 01290_max_execution_speed_distributed%' AND type = 2; +SELECT DISTINCT query_duration_ms >= 500 +FROM system.query_log +WHERE + event_date >= yesterday() AND + query LIKE '%special query for 01290_max_execution_speed_distributed%' AND + query NOT LIKE '%system.query_log%' AND + type = 2;