Merge pull request #17986 from azat/fwd-decl

More forward declaration for generic headers
This commit is contained in:
alexey-milovidov 2020-12-13 01:12:05 +03:00 committed by GitHub
commit 04e222f6f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
55 changed files with 167 additions and 91 deletions

View File

@ -27,6 +27,7 @@
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/UseSSL.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <Interpreters/Context.h>
@ -95,6 +96,7 @@ public:
}
global_context.makeGlobalContext();
global_context.setSettings(settings);
std::cerr << std::fixed << std::setprecision(3);
@ -404,7 +406,7 @@ private:
Stopwatch watch;
RemoteBlockInputStream stream(
*(*connection_entries[connection_index]),
query, {}, global_context, &settings, nullptr, Scalars(), Tables(), query_processing_stage);
query, {}, global_context, nullptr, Scalars(), Tables(), query_processing_stage);
if (!query_id.empty())
stream.setQueryId(query_id);

View File

@ -5,6 +5,7 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/setThreadName.h>
#include <IO/ConnectionTimeoutsContext.h>
namespace DB
@ -1588,11 +1589,14 @@ void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskT
LOG_DEBUG(log, "All helping tables dropped partition {}", partition_name);
}
String ClusterCopier::getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings * settings)
String ClusterCopier::getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings & settings)
{
Context remote_context(context);
remote_context.setSettings(settings);
String query = "SHOW CREATE TABLE " + getQuotedTable(table);
Block block = getBlockWithAllStreamData(std::make_shared<RemoteBlockInputStream>(
connection, query, InterpreterShowCreateQuery::getSampleBlock(), context, settings));
connection, query, InterpreterShowCreateQuery::getSampleBlock(), remote_context));
return typeid_cast<const ColumnString &>(*block.safeGetByPosition(0).column).getDataAt(0).toString();
}
@ -1604,7 +1608,7 @@ ASTPtr ClusterCopier::getCreateTableForPullShard(const ConnectionTimeouts & time
String create_query_pull_str = getRemoteCreateTable(
task_shard.task_table.table_pull,
*connection_entry,
&task_cluster->settings_pull);
task_cluster->settings_pull);
ParserCreateQuery parser_create_query;
const auto & settings = context.getSettingsRef();
@ -1856,6 +1860,9 @@ UInt64 ClusterCopier::executeQueryOnCluster(
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(shard_settings).getSaturated(shard_settings.max_execution_time);
auto connections = shard.pool->getMany(timeouts, &shard_settings, pool_mode);
Context shard_context(context);
shard_context.setSettings(shard_settings);
for (auto & connection : connections)
{
if (connection.isNull())
@ -1864,7 +1871,7 @@ UInt64 ClusterCopier::executeQueryOnCluster(
try
{
/// CREATE TABLE and DROP PARTITION queries return empty block
RemoteBlockInputStream stream{*connection, query, Block{}, context, &shard_settings};
RemoteBlockInputStream stream{*connection, query, Block{}, shard_context};
NullBlockOutputStream output{Block{}};
copyData(stream, output);

View File

@ -154,7 +154,7 @@ protected:
/// table we can get rid of partition pieces (partitions in helping tables).
void dropParticularPartitionPieceFromAllHelpingTables(const TaskTable & task_table, const String & partition_name);
String getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings * settings = nullptr);
String getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings & settings);
ASTPtr getCreateTableForPullShard(const ConnectionTimeouts & timeouts, TaskShard & task_shard);

View File

@ -1,6 +1,7 @@
#include "ClusterCopierApp.h"
#include <Common/StatusFile.h>
#include <Common/TerminalSize.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <Formats/registerFormats.h>
#include <unistd.h>

View File

@ -1,6 +1,7 @@
#pragma once
#include "Aliases.h"
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
@ -12,7 +13,9 @@ namespace ErrorCodes
struct TaskCluster
{
TaskCluster(const String & task_zookeeper_path_, const String & default_local_database_)
: task_zookeeper_path(task_zookeeper_path_), default_local_database(default_local_database_) {}
: task_zookeeper_path(task_zookeeper_path_)
, default_local_database(default_local_database_)
{}
void loadTasks(const Poco::Util::AbstractConfiguration & config, const String & base_key = "");

View File

@ -1,5 +1,6 @@
#include <Poco/Net/NetException.h>
#include <Core/Defines.h>
#include <Core/Settings.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadBufferFromPocoSocket.h>

View File

@ -5,6 +5,7 @@
#include <Poco/Net/StreamSocket.h>
#include <Common/Throttler.h>
#include <Common/config.h>
#include <Core/Block.h>
#include <Core/Defines.h>
@ -17,7 +18,6 @@
#include <IO/ConnectionTimeouts.h>
#include <Core/Settings.h>
#include <Interpreters/TablesStatus.h>
#include <Compression/ICompressionCodec.h>
@ -31,6 +31,7 @@ namespace DB
class ClientInfo;
class Pipe;
struct Settings;
/// Struct which represents data we are going to send for external table.
struct ExternalTableData

View File

@ -1,9 +1,9 @@
#pragma once
#include <Common/PoolBase.h>
#include <Client/Connection.h>
#include <IO/ConnectionTimeouts.h>
#include <Core/Settings.h>
namespace DB
{

View File

@ -4,6 +4,7 @@
#include <Poco/Net/DNS.h>
#include <Common/BitHelpers.h>
#include <Common/quoteString.h>
#include <common/getFQDNOrHostName.h>
#include <Common/isLocalAddress.h>
#include <Common/ProfileEvents.h>

View File

@ -12,6 +12,7 @@
#include <Poco/URI.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/ShellCommand.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <common/logger_useful.h>
#include <ext/range.h>

View File

@ -6,27 +6,27 @@ namespace DB
RemoteBlockInputStream::RemoteBlockInputStream(
Connection & connection,
const String & query_, const Block & header_, const Context & context_, const Settings * settings,
const String & query_, const Block & header_, const Context & context_,
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: query_executor(connection, query_, header_, context_, settings, throttler, scalars_, external_tables_, stage_)
: query_executor(connection, query_, header_, context_, throttler, scalars_, external_tables_, stage_)
{
init();
}
RemoteBlockInputStream::RemoteBlockInputStream(
std::vector<IConnectionPool::Entry> && connections,
const String & query_, const Block & header_, const Context & context_, const Settings * settings,
const String & query_, const Block & header_, const Context & context_,
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: query_executor(std::move(connections), query_, header_, context_, settings, throttler, scalars_, external_tables_, stage_)
: query_executor(std::move(connections), query_, header_, context_, throttler, scalars_, external_tables_, stage_)
{
init();
}
RemoteBlockInputStream::RemoteBlockInputStream(
const ConnectionPoolWithFailoverPtr & pool,
const String & query_, const Block & header_, const Context & context_, const Settings * settings,
const String & query_, const Block & header_, const Context & context_,
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: query_executor(pool, query_, header_, context_, settings, throttler, scalars_, external_tables_, stage_)
: query_executor(pool, query_, header_, context_, throttler, scalars_, external_tables_, stage_)
{
init();
}

View File

@ -6,7 +6,6 @@
#include <DataStreams/IBlockInputStream.h>
#include <Common/Throttler.h>
#include <Interpreters/Context.h>
#include <Client/ConnectionPool.h>
#include <Client/MultiplexedConnections.h>
#include <Interpreters/Cluster.h>
@ -16,32 +15,31 @@
namespace DB
{
class Context;
/** This class allows one to launch queries on remote replicas of one shard and get results
*/
class RemoteBlockInputStream : public IBlockInputStream
{
public:
/// Takes already set connection.
/// If `settings` is nullptr, settings will be taken from context.
RemoteBlockInputStream(
Connection & connection,
const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr,
const String & query_, const Block & header_, const Context & context_,
const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
/// Accepts several connections already taken from pool.
/// If `settings` is nullptr, settings will be taken from context.
RemoteBlockInputStream(
std::vector<IConnectionPool::Entry> && connections,
const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr,
const String & query_, const Block & header_, const Context & context_,
const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
/// Takes a pool and gets one or several connections from it.
/// If `settings` is nullptr, settings will be taken from context.
RemoteBlockInputStream(
const ConnectionPoolWithFailoverPtr & pool,
const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr,
const String & query_, const Block & header_, const Context & context_,
const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);

View File

@ -8,7 +8,9 @@
#include <Storages/SelectQueryInfo.h>
#include <Interpreters/castColumn.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <IO/ConnectionTimeoutsContext.h>
namespace DB
{
@ -20,14 +22,11 @@ namespace ErrorCodes
RemoteQueryExecutor::RemoteQueryExecutor(
Connection & connection,
const String & query_, const Block & header_, const Context & context_, const Settings * settings,
const String & query_, const Block & header_, const Context & context_,
ThrottlerPtr throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: header(header_), query(query_), context(context_)
, scalars(scalars_), external_tables(external_tables_), stage(stage_)
{
if (settings)
context.setSettings(*settings);
create_multiplexed_connections = [this, &connection, throttler]()
{
return std::make_unique<MultiplexedConnections>(connection, context.getSettingsRef(), throttler);
@ -36,14 +35,11 @@ RemoteQueryExecutor::RemoteQueryExecutor(
RemoteQueryExecutor::RemoteQueryExecutor(
std::vector<IConnectionPool::Entry> && connections,
const String & query_, const Block & header_, const Context & context_, const Settings * settings,
const String & query_, const Block & header_, const Context & context_,
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: header(header_), query(query_), context(context_)
, scalars(scalars_), external_tables(external_tables_), stage(stage_)
{
if (settings)
context.setSettings(*settings);
create_multiplexed_connections = [this, connections, throttler]() mutable
{
return std::make_unique<MultiplexedConnections>(
@ -53,14 +49,11 @@ RemoteQueryExecutor::RemoteQueryExecutor(
RemoteQueryExecutor::RemoteQueryExecutor(
const ConnectionPoolWithFailoverPtr & pool,
const String & query_, const Block & header_, const Context & context_, const Settings * settings,
const String & query_, const Block & header_, const Context & context_,
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: header(header_), query(query_), context(context_)
, scalars(scalars_), external_tables(external_tables_), stage(stage_)
{
if (settings)
context.setSettings(*settings);
create_multiplexed_connections = [this, pool, throttler]()
{
const Settings & current_settings = context.getSettingsRef();
@ -147,7 +140,7 @@ void RemoteQueryExecutor::sendQuery()
multiplexed_connections = create_multiplexed_connections();
const auto& settings = context.getSettingsRef();
const auto & settings = context.getSettingsRef();
if (settings.skip_unavailable_shards && 0 == multiplexed_connections->size())
return;

View File

@ -1,12 +1,15 @@
#pragma once
#include <Interpreters/Context.h>
#include <Client/ConnectionPool.h>
#include <Client/MultiplexedConnections.h>
#include <Storages/IStorage_fwd.h>
#include <Interpreters/StorageID.h>
namespace DB
{
class Context;
class Throttler;
using ThrottlerPtr = std::shared_ptr<Throttler>;
@ -21,26 +24,23 @@ class RemoteQueryExecutor
{
public:
/// Takes already set connection.
/// If `settings` is nullptr, settings will be taken from context.
RemoteQueryExecutor(
Connection & connection,
const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr,
const String & query_, const Block & header_, const Context & context_,
ThrottlerPtr throttler_ = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
/// Accepts several connections already taken from pool.
/// If `settings` is nullptr, settings will be taken from context.
RemoteQueryExecutor(
std::vector<IConnectionPool::Entry> && connections,
const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr,
const String & query_, const Block & header_, const Context & context_,
const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
/// Takes a pool and gets one or several connections from it.
/// If `settings` is nullptr, settings will be taken from context.
RemoteQueryExecutor(
const ConnectionPoolWithFailoverPtr & pool,
const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr,
const String & query_, const Block & header_, const Context & context_,
const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
@ -93,7 +93,7 @@ private:
const String query;
String query_id = "";
Context context;
const Context & context;
ProgressCallback progress_callback;
ProfileInfoCallback profile_info_callback;

View File

@ -2,6 +2,7 @@
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/WriteBufferFromOStream.h>
#include <IO/WriteBufferFromString.h>

View File

@ -6,6 +6,7 @@
#include <Formats/FormatFactory.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/WriteHelpers.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <Interpreters/Context.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Util/AbstractConfiguration.h>

View File

@ -17,6 +17,8 @@
#include <Common/createHardLink.h>
#include <Common/quoteString.h>
#include <Common/thread_local_rng.h>
#include <Common/ThreadPool.h>
#include <common/logger_useful.h>
#include <aws/s3/model/CopyObjectRequest.h>
#include <aws/s3/model/DeleteObjectsRequest.h>

View File

@ -16,7 +16,6 @@ namespace DB
class Block;
class Context;
struct FormatSettings;
struct Settings;
struct FormatFactorySettings;

View File

@ -2,7 +2,6 @@
#include <Functions/IFunctionAdaptors.h>
#include <Common/IFactoryWithAliases.h>
#include <Interpreters/Context.h>
#include <functional>
#include <memory>

View File

@ -1,5 +1,6 @@
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <Interpreters/Context.h>
namespace DB

View File

@ -1,14 +1,13 @@
#pragma once
#include <Poco/Timespan.h>
#include <Core/Settings.h>
#include <Interpreters/Context.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
class Context;
struct Settings;
struct ConnectionTimeouts
{
Poco::Timespan connection_timeout;
@ -92,24 +91,10 @@ struct ConnectionTimeouts
}
/// Timeouts for the case when we have just single attempt to connect.
static ConnectionTimeouts getTCPTimeoutsWithoutFailover(const Settings & settings)
{
return ConnectionTimeouts(settings.connect_timeout, settings.send_timeout, settings.receive_timeout, settings.tcp_keep_alive_timeout);
}
static ConnectionTimeouts getTCPTimeoutsWithoutFailover(const Settings & settings);
/// Timeouts for the case when we will try many addresses in a loop.
static ConnectionTimeouts getTCPTimeoutsWithFailover(const Settings & settings)
{
return ConnectionTimeouts(settings.connect_timeout_with_failover_ms, settings.send_timeout, settings.receive_timeout, settings.tcp_keep_alive_timeout, 0, settings.connect_timeout_with_failover_secure_ms);
}
static ConnectionTimeouts getHTTPTimeouts(const Context & context)
{
const auto & settings = context.getSettingsRef();
const auto & config = context.getConfigRef();
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 10), 0};
return ConnectionTimeouts(settings.http_connection_timeout, settings.http_send_timeout, settings.http_receive_timeout, settings.tcp_keep_alive_timeout, http_keep_alive_timeout);
}
static ConnectionTimeouts getTCPTimeoutsWithFailover(const Settings & settings);
static ConnectionTimeouts getHTTPTimeouts(const Context & context);
};
}

View File

@ -0,0 +1,30 @@
#pragma once
#include <IO/ConnectionTimeouts.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Interpreters/Context.h>
namespace DB
{
/// Timeouts for the case when we have just single attempt to connect.
inline ConnectionTimeouts ConnectionTimeouts::getTCPTimeoutsWithoutFailover(const Settings & settings)
{
return ConnectionTimeouts(settings.connect_timeout, settings.send_timeout, settings.receive_timeout, settings.tcp_keep_alive_timeout);
}
/// Timeouts for the case when we will try many addresses in a loop.
inline ConnectionTimeouts ConnectionTimeouts::getTCPTimeoutsWithFailover(const Settings & settings)
{
return ConnectionTimeouts(settings.connect_timeout_with_failover_ms, settings.send_timeout, settings.receive_timeout, settings.tcp_keep_alive_timeout, 0, settings.connect_timeout_with_failover_secure_ms);
}
inline ConnectionTimeouts ConnectionTimeouts::getHTTPTimeouts(const Context & context)
{
const auto & settings = context.getSettingsRef();
const auto & config = context.getConfigRef();
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 10), 0};
return ConnectionTimeouts(settings.http_connection_timeout, settings.http_send_timeout, settings.http_receive_timeout, settings.tcp_keep_alive_timeout, http_keep_alive_timeout);
}
}

View File

@ -19,6 +19,7 @@
# include <IO/S3/PocoHTTPClient.h>
# include <Poco/URI.h>
# include <re2/re2.h>
# include <boost/algorithm/string/case_conv.hpp>
# include <common/logger_useful.h>
namespace

View File

@ -5,6 +5,7 @@
#include <Common/isLocalAddress.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/parseAddress.h>
#include <Core/Settings.h>
#include <IO/HexWriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>

View File

@ -1,13 +1,23 @@
#pragma once
#include <map>
#include <Core/Settings.h>
#include <Client/ConnectionPool.h>
#include <Client/ConnectionPoolWithFailover.h>
#include <Poco/Net/SocketAddress.h>
namespace Poco
{
namespace Util
{
class AbstractConfiguration;
}
}
namespace DB
{
struct Settings;
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;

View File

@ -32,7 +32,7 @@ public:
virtual void createForShard(
const Cluster::ShardInfo & shard_info,
const String & query, const ASTPtr & query_ast,
const Context & context, const ThrottlerPtr & throttler,
const std::shared_ptr<Context> & context_ptr, const ThrottlerPtr & throttler,
const SelectQueryInfo & query_info,
std::vector<QueryPlanPtr> & res,
Pipes & remote_pipes,

View File

@ -7,6 +7,7 @@
#include <Common/ProfileEvents.h>
#include <Common/checkStackSize.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <common/logger_useful.h>
#include <Processors/Pipe.h>
@ -113,13 +114,15 @@ String formattedAST(const ASTPtr & ast)
void SelectStreamFactory::createForShard(
const Cluster::ShardInfo & shard_info,
const String &, const ASTPtr & query_ast,
const Context & context, const ThrottlerPtr & throttler,
const std::shared_ptr<Context> & context_ptr, const ThrottlerPtr & throttler,
const SelectQueryInfo &,
std::vector<QueryPlanPtr> & plans,
Pipes & remote_pipes,
Pipes & delayed_pipes,
Poco::Logger * log)
{
const auto & context = *context_ptr;
bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
bool add_totals = false;
bool add_extremes = false;
@ -143,7 +146,7 @@ void SelectStreamFactory::createForShard(
auto emplace_remote_stream = [&]()
{
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard_info.pool, modified_query, header, context, nullptr, throttler, scalars, external_tables, processed_stage);
shard_info.pool, modified_query, header, context, throttler, scalars, external_tables, processed_stage);
remote_query_executor->setLogger(log);
remote_query_executor->setPoolMode(PoolMode::GET_MANY);
@ -151,6 +154,7 @@ void SelectStreamFactory::createForShard(
remote_query_executor->setMainTable(main_table);
remote_pipes.emplace_back(createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes));
remote_pipes.back().addInterpreterContext(context_ptr);
};
const auto & settings = context.getSettingsRef();
@ -242,7 +246,8 @@ void SelectStreamFactory::createForShard(
/// Do it lazily to avoid connecting in the main thread.
auto lazily_create_stream = [
pool = shard_info.pool, shard_num = shard_info.shard_num, modified_query, header = header, modified_query_ast, context, throttler,
pool = shard_info.pool, shard_num = shard_info.shard_num, modified_query, header = header, modified_query_ast,
&context, context_ptr, throttler,
main_table = main_table, table_func_ptr = table_func_ptr, scalars = scalars, external_tables = external_tables,
stage = processed_stage, local_delay, add_agg_info, add_totals, add_extremes]()
-> Pipe
@ -288,13 +293,14 @@ void SelectStreamFactory::createForShard(
connections.emplace_back(std::move(try_result.entry));
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
std::move(connections), modified_query, header, context, nullptr, throttler, scalars, external_tables, stage);
std::move(connections), modified_query, header, context, throttler, scalars, external_tables, stage);
return createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes);
}
};
delayed_pipes.emplace_back(createDelayedPipe(header, lazily_create_stream, add_totals, add_extremes));
delayed_pipes.back().addInterpreterContext(context_ptr);
}
else
emplace_remote_stream();

View File

@ -37,7 +37,7 @@ public:
void createForShard(
const Cluster::ShardInfo & shard_info,
const String & query, const ASTPtr & query_ast,
const Context & context, const ThrottlerPtr & throttler,
const std::shared_ptr<Context> & context_ptr, const ThrottlerPtr & throttler,
const SelectQueryInfo & query_info,
std::vector<QueryPlanPtr> & plans,
Pipes & remote_pipes,

View File

@ -19,7 +19,7 @@ namespace DB
namespace ClusterProxy
{
Context updateSettingsForCluster(const Cluster & cluster, const Context & context, const Settings & settings, Poco::Logger * log)
std::shared_ptr<Context> updateSettingsForCluster(const Cluster & cluster, const Context & context, const Settings & settings, Poco::Logger * log)
{
Settings new_settings = settings;
new_settings.queue_max_wait_ms = Cluster::saturate(new_settings.queue_max_wait_ms, settings.max_execution_time);
@ -78,9 +78,8 @@ Context updateSettingsForCluster(const Cluster & cluster, const Context & contex
}
}
Context new_context(context);
new_context.setSettings(new_settings);
auto new_context = std::make_shared<Context>(context);
new_context->setSettings(new_settings);
return new_context;
}
@ -99,7 +98,7 @@ void executeQuery(
const std::string query = queryToString(query_ast);
Context new_context = updateSettingsForCluster(*query_info.cluster, context, settings, log);
auto new_context = updateSettingsForCluster(*query_info.cluster, context, settings, log);
ThrottlerPtr user_level_throttler;
if (auto * process_list_element = context.getProcessListElement())

View File

@ -27,7 +27,7 @@ class IStreamFactory;
/// - optimize_skip_unused_shards_nesting
///
/// @return new Context with adjusted settings
Context updateSettingsForCluster(const Cluster & cluster, const Context & context, const Settings & settings, Poco::Logger * log = nullptr);
std::shared_ptr<Context> updateSettingsForCluster(const Cluster & cluster, const Context & context, const Settings & settings, Poco::Logger * log = nullptr);
/// Execute a distributed query, creating a vector of BlockInputStreams, from which the result can be read.
/// `stream_factory` object encapsulates the logic of creating streams for a different type of query

View File

@ -1,6 +1,7 @@
#pragma once
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
#include <DataStreams/BlockIO.h>
#include <Common/CurrentThread.h>
#include <Common/ThreadPool.h>

View File

@ -2,7 +2,6 @@
#include <Core/Block.h>
#include <Core/ColumnNumbers.h>
#include <Core/Settings.h>
#include <Interpreters/ActionsDAG.h>
#include <variant>

View File

@ -112,6 +112,11 @@ bool sanitizeBlock(Block & block, bool throw_if_cannot_create_column)
return true;
}
ExpressionAnalyzer::ExtractedSettings::ExtractedSettings(const Settings & settings_)
: use_index_for_in_with_subqueries(settings_.use_index_for_in_with_subqueries)
, size_limits_for_set(settings_.max_rows_in_set, settings_.max_bytes_in_set, settings_.set_overflow_mode)
{}
ExpressionAnalyzer::ExpressionAnalyzer(
const ASTPtr & query_,

View File

@ -1,6 +1,5 @@
#pragma once
#include <Core/Settings.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Columns/FilterDescription.h>
#include <Interpreters/AggregateDescription.h>
@ -16,6 +15,7 @@ namespace DB
class Block;
class Context;
struct Settings;
struct ExpressionActionsChain;
class ExpressionActions;
@ -80,10 +80,7 @@ private:
const bool use_index_for_in_with_subqueries;
const SizeLimits size_limits_for_set;
ExtractedSettings(const Settings & settings_)
: use_index_for_in_with_subqueries(settings_.use_index_for_in_with_subqueries),
size_limits_for_set(settings_.max_rows_in_set, settings_.max_bytes_in_set, settings_.set_overflow_mode)
{}
ExtractedSettings(const Settings & settings_);
};
public:

View File

@ -13,6 +13,7 @@
#include <DataStreams/SquashingBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/InterpreterWatchQuery.h>
#include <Interpreters/JoinedTables.h>

View File

@ -5,6 +5,7 @@
#include <Parsers/queryToString.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/parseQuery.h>
#include <Core/Settings.h>
using namespace DB;

View File

@ -5,7 +5,6 @@
#include <Core/Defines.h>
#include <common/types.h>
#include <Core/Settings.h>
#include <IO/WriteHelpers.h>
#include <Parsers/IAST.h>
#include <Parsers/TokenIterator.h>

View File

@ -6,6 +6,7 @@
#include <Parsers/ParserSetQuery.h>
#include <Common/typeid_cast.h>
#include <Common/SettingsChanges.h>
namespace DB

View File

@ -7,6 +7,8 @@
namespace DB
{
struct SettingChange;
/** Query like this:
* SET name1 = value1, name2 = value2, ...
*/

View File

@ -36,6 +36,7 @@
#include <DataStreams/IBlockInputStream.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/QueryParameterVisitor.h>
#include <Interpreters/Context.h>
#include <Common/typeid_cast.h>
#include <Poco/Net/HTTPStream.h>

View File

@ -11,6 +11,7 @@
#include <IO/ReadBufferFromIStream.h>
#include <IO/WriteBufferFromHTTPServerResponse.h>
#include <Interpreters/InterserverIOHandler.h>
#include <Interpreters/Context.h>
#include "IServer.h"
namespace DB

View File

@ -10,6 +10,7 @@
#include <IO/copyData.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromHTTPServerResponse.h>
#include <Interpreters/Context.h>
#include <Common/Exception.h>

View File

@ -17,6 +17,7 @@
#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedReadBuffer.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/Operators.h>
#include <boost/algorithm/string/find_iterator.hpp>

View File

@ -10,6 +10,7 @@
#include <Compression/CompressedWriteBuffer.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/RemoteBlockOutputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>

View File

@ -4,6 +4,7 @@
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTFunction.h>
#include <Common/Exception.h>
#include <Core/Settings.h>
namespace DB

View File

@ -9,6 +9,7 @@
#include <DataTypes/DataTypeString.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTInsertQuery.h>

View File

@ -2,7 +2,6 @@
#include <Core/BackgroundSchedulePool.h>
#include <Storages/IStorage.h>
#include <Interpreters/Context.h>
#include <Poco/Semaphore.h>
#include <ext/shared_ptr_helper.h>
#include <mutex>
@ -19,6 +18,8 @@
namespace DB
{
class Context;
using ChannelPtr = std::shared_ptr<AMQP::TcpChannel>;
class StorageRabbitMQ final: public ext::shared_ptr_helper<StorageRabbitMQ>, public IStorage

View File

@ -42,6 +42,7 @@
#include <IO/ReadBufferFromString.h>
#include <IO/Operators.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/PartLog.h>

View File

@ -10,6 +10,8 @@
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/WriteBufferFromHTTP.h>
#include <IO/WriteHelpers.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <Formats/FormatFactory.h>

View File

@ -4,12 +4,15 @@
#include <Poco/URI.h>
#include <ext/shared_ptr_helper.h>
#include <DataStreams/IBlockOutputStream.h>
#include <IO/ConnectionTimeouts.h>
#include <Formats/FormatSettings.h>
#include <IO/CompressionMethod.h>
namespace DB
{
struct ConnectionTimeouts;
/**
* This class represents table engine for external urls.
* It sends HTTP GET to server when select is called and

View File

@ -3,6 +3,7 @@
#include <DataStreams/IBlockOutputStream.h>
#include <Formats/FormatFactory.h>
#include <IO/ReadHelpers.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTLiteral.h>

View File

@ -9,6 +9,7 @@
#include <Storages/VirtualColumnUtils.h>
#include <Access/ContextAccess.h>
#include <Common/typeid_cast.h>
#include <Interpreters/Context.h>
#include <Databases/IDatabase.h>
namespace DB

View File

@ -71,7 +71,7 @@ ColumnsDescription getStructureOfRemoteTableInShard(
};
/// Execute remote query without restrictions (because it's not real user query, but part of implementation)
auto input = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, sample_block, new_context);
auto input = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, sample_block, *new_context);
input->setPoolMode(PoolMode::GET_ONE);
if (!table_func_ptr)
input->setMainTable(table_id);

View File

@ -5,6 +5,7 @@
#include <DataTypes/DataTypeFactory.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>

View File

@ -1,4 +1,8 @@
SET max_execution_speed = 1000000, timeout_before_checking_execution_speed = 0.001, max_block_size = 100;
SET max_execution_speed = 1000000;
SET timeout_before_checking_execution_speed = 0.001;
SET max_block_size = 100;
SET log_queries=1;
CREATE TEMPORARY TABLE times (t DateTime);
@ -10,4 +14,10 @@ SELECT max(t) - min(t) >= 1 FROM times;
-- Check that the query was also throttled on "remote" servers.
SYSTEM FLUSH LOGS;
SELECT DISTINCT query_duration_ms >= 500 FROM system.query_log WHERE event_date >= yesterday() AND query LIKE '%special query for 01290_max_execution_speed_distributed%' AND type = 2;
SELECT DISTINCT query_duration_ms >= 500
FROM system.query_log
WHERE
event_date >= yesterday() AND
query LIKE '%special query for 01290_max_execution_speed_distributed%' AND
query NOT LIKE '%system.query_log%' AND
type = 2;