Fixed error with selecting number of threads for distributed query processing [#CLICKHOUSE-3115].

This commit is contained in:
Alexey Milovidov 2017-07-04 00:04:10 +03:00
parent 15cf838250
commit 727ce48c23
3 changed files with 12 additions and 14 deletions

View File

@ -79,6 +79,8 @@ void InterpreterSelectQuery::init(BlockInputStreamPtr input, const Names & requi
throw Exception("Too deep subqueries. Maximum: " + settings.limits.max_subquery_depth.toString(),
ErrorCodes::TOO_DEEP_SUBQUERIES);
max_streams = settings.max_threads;
if (is_first_select_inside_union_all)
{
/// Create a SELECT query chain.
@ -785,7 +787,6 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns()
getLimitLengthAndOffset(query, limit_length, limit_offset);
size_t max_block_size = settings.max_block_size;
size_t max_streams = settings.max_threads;
/** With distributed query processing, almost no computations are done in the threads,
* but wait and receive data from remote servers.
@ -793,10 +794,6 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns()
* connect and ask only 8 servers at a time.
* To simultaneously query more remote servers,
* instead of max_threads, max_distributed_connections is used.
*
* Save the initial value of max_threads in settings_for_storage
* - these settings will be passed to remote servers for distributed query processing,
* and there must be an original value of max_threads, not an increased value.
*/
bool is_remote = false;
if (storage && storage->isRemote())
@ -947,10 +944,10 @@ void InterpreterSelectQuery::executeAggregation(ExpressionActionsPtr expression,
{
streams[0] = std::make_shared<ParallelAggregatingBlockInputStream>(
streams, stream_with_non_joined_data, params, final,
settings.max_threads,
max_streams,
settings.aggregation_memory_efficient_merge_threads
? settings.aggregation_memory_efficient_merge_threads
: settings.max_threads);
? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads)
: max_streams);
stream_with_non_joined_data = nullptr;
streams.resize(1);
@ -1004,12 +1001,12 @@ void InterpreterSelectQuery::executeMergeAggregated(bool overflow_row, bool fina
executeUnion();
/// Now merge the aggregated blocks
streams[0] = std::make_shared<MergingAggregatedBlockInputStream>(streams[0], params, final, settings.max_threads);
streams[0] = std::make_shared<MergingAggregatedBlockInputStream>(streams[0], params, final, max_streams);
}
else
{
streams[0] = std::make_shared<MergingAggregatedMemoryEfficientBlockInputStream>(streams, params, final,
settings.max_threads,
max_streams,
settings.aggregation_memory_efficient_merge_threads
? size_t(settings.aggregation_memory_efficient_merge_threads)
: size_t(settings.max_threads));
@ -1185,9 +1182,7 @@ void InterpreterSelectQuery::executeUnion()
/// If there are still several streams, then we combine them into one
if (hasMoreThanOneStream())
{
const Settings & settings = context.getSettingsRef();
streams[0] = std::make_shared<UnionBlockInputStream<>>(streams, stream_with_non_joined_data, settings.max_threads);
streams[0] = std::make_shared<UnionBlockInputStream<>>(streams, stream_with_non_joined_data, max_streams);
stream_with_non_joined_data = nullptr;
streams.resize(1);
union_within_single_query = false;

View File

@ -172,6 +172,9 @@ private:
std::unique_ptr<ExpressionAnalyzer> query_analyzer;
NamesAndTypesList table_column_names;
/// How many streams we ask for storage to produce, and in how many threads we will do further processing.
size_t max_streams = 1;
/** Streams of data.
* The source data streams are produced in the executeFetchColumns function.
* Then they are converted (wrapped in other streams) using the `execute*` functions,

View File

@ -223,7 +223,7 @@ BlockInputStreams StorageDistributed::read(
bool enable_shard_multiplexing = false;
ClusterProxy::SelectQueryConstructor select_query_constructor(
processed_stage, QualifiedTableName{remote_database, remote_table}, external_tables);
processed_stage, QualifiedTableName{remote_database, remote_table}, external_tables);
return ClusterProxy::Query{select_query_constructor, cluster, modified_query_ast,
context, settings, enable_shard_multiplexing}.execute();