mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-21 09:10:48 +00:00
Merge pull request #10399 from ClickHouse/fix-distributed_group_by_no_merge-segfault
Fix segfault with distributed_group_by_no_merge
This commit is contained in:
commit
fcf5c3c6e1
@ -70,7 +70,9 @@ SelectStreamFactory::SelectStreamFactory(
|
||||
namespace
|
||||
{
|
||||
|
||||
Pipe createLocalStream(const ASTPtr & query_ast, const Block & header, const Context & context, QueryProcessingStage::Enum processed_stage, bool force_tree_shaped_pipeline)
|
||||
Pipe createLocalStream(
|
||||
const ASTPtr & query_ast, const Block & header, const Context & context, QueryProcessingStage::Enum processed_stage,
|
||||
bool add_totals_port, bool add_extremes_port, bool force_tree_shaped_pipeline)
|
||||
{
|
||||
checkStackSize();
|
||||
|
||||
@ -83,12 +85,10 @@ Pipe createLocalStream(const ASTPtr & query_ast, const Block & header, const Con
|
||||
auto stream = interpreter.execute().in;
|
||||
auto source = std::make_shared<SourceFromInputStream>(std::move(stream));
|
||||
|
||||
bool add_totals_and_extremes_port = processed_stage == QueryProcessingStage::Complete;
|
||||
if (add_totals_and_extremes_port)
|
||||
{
|
||||
if (add_totals_port)
|
||||
source->addTotalsPort();
|
||||
if (add_extremes_port)
|
||||
source->addExtremesPort();
|
||||
}
|
||||
|
||||
Pipe pipe(std::move(source));
|
||||
|
||||
@ -138,7 +138,13 @@ void SelectStreamFactory::createForShard(
|
||||
Pipes & res)
|
||||
{
|
||||
bool force_add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
|
||||
bool add_totals_and_extremes_port = processed_stage == QueryProcessingStage::Complete;
|
||||
bool add_totals_port = false;
|
||||
bool add_extremes_port = false;
|
||||
if (processed_stage == QueryProcessingStage::Complete)
|
||||
{
|
||||
add_totals_port = query_ast->as<ASTSelectQuery &>().group_by_with_totals;
|
||||
add_extremes_port = context.getSettingsRef().extremes;
|
||||
}
|
||||
|
||||
auto modified_query_ast = query_ast->clone();
|
||||
if (has_virtual_shard_num_column)
|
||||
@ -146,7 +152,8 @@ void SelectStreamFactory::createForShard(
|
||||
|
||||
auto emplace_local_stream = [&]()
|
||||
{
|
||||
res.emplace_back(createLocalStream(modified_query_ast, header, context, processed_stage, query_info.force_tree_shaped_pipeline));
|
||||
res.emplace_back(createLocalStream(modified_query_ast, header, context, processed_stage,
|
||||
add_totals_port, add_extremes_port, query_info.force_tree_shaped_pipeline));
|
||||
};
|
||||
|
||||
String modified_query = formattedAST(modified_query_ast);
|
||||
@ -161,11 +168,10 @@ void SelectStreamFactory::createForShard(
|
||||
|
||||
auto source = std::make_shared<SourceFromInputStream>(std::move(stream), force_add_agg_info);
|
||||
|
||||
if (add_totals_and_extremes_port)
|
||||
{
|
||||
if (add_totals_port)
|
||||
source->addTotalsPort();
|
||||
if (add_extremes_port)
|
||||
source->addExtremesPort();
|
||||
}
|
||||
|
||||
res.emplace_back(std::move(source));
|
||||
};
|
||||
@ -265,7 +271,7 @@ void SelectStreamFactory::createForShard(
|
||||
auto lazily_create_stream = [
|
||||
pool = shard_info.pool, shard_num = shard_info.shard_num, modified_query, header = header, modified_query_ast, context, throttler,
|
||||
main_table = main_table, table_func_ptr = table_func_ptr, scalars = scalars, external_tables = external_tables,
|
||||
stage = processed_stage, local_delay]()
|
||||
stage = processed_stage, local_delay, add_totals_port, add_extremes_port]()
|
||||
-> BlockInputStreamPtr
|
||||
{
|
||||
auto current_settings = context.getSettingsRef();
|
||||
@ -298,7 +304,8 @@ void SelectStreamFactory::createForShard(
|
||||
}
|
||||
|
||||
if (try_results.empty() || local_delay < max_remote_delay)
|
||||
return std::make_shared<TreeExecutorBlockInputStream>(createLocalStream(modified_query_ast, header, context, stage, true));
|
||||
return std::make_shared<TreeExecutorBlockInputStream>(
|
||||
createLocalStream(modified_query_ast, header, context, stage, add_totals_port, add_extremes_port, true));
|
||||
else
|
||||
{
|
||||
std::vector<IConnectionPool::Entry> connections;
|
||||
@ -314,11 +321,10 @@ void SelectStreamFactory::createForShard(
|
||||
auto lazy_stream = std::make_shared<LazyBlockInputStream>("LazyShardWithLocalReplica", header, lazily_create_stream);
|
||||
auto source = std::make_shared<SourceFromInputStream>(std::move(lazy_stream), force_add_agg_info);
|
||||
|
||||
if (add_totals_and_extremes_port)
|
||||
{
|
||||
if (add_totals_port)
|
||||
source->addTotalsPort();
|
||||
if (add_extremes_port)
|
||||
source->addExtremesPort();
|
||||
}
|
||||
|
||||
res.emplace_back(std::move(source));
|
||||
}
|
||||
|
@ -1713,7 +1713,8 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const
|
||||
|
||||
auto transform_params = std::make_shared<AggregatingTransformParams>(params, final);
|
||||
|
||||
pipeline.dropTotalsIfHas();
|
||||
/// Forget about current totals and extremes. They will be calculated again after aggregation if needed.
|
||||
pipeline.dropTotalsAndExtremes();
|
||||
|
||||
/// If there are several sources, then we perform parallel aggregation
|
||||
if (pipeline.getNumStreams() > 1)
|
||||
|
@ -401,18 +401,24 @@ void QueryPipeline::addTotals(ProcessorPtr source)
|
||||
assertBlocksHaveEqualStructure(current_header, source->getOutputs().front().getHeader(), "QueryPipeline");
|
||||
|
||||
totals_having_port = &source->getOutputs().front();
|
||||
processors.emplace_back(source);
|
||||
processors.emplace_back(std::move(source));
|
||||
}
|
||||
|
||||
void QueryPipeline::dropTotalsIfHas()
|
||||
void QueryPipeline::dropTotalsAndExtremes()
|
||||
{
|
||||
if (totals_having_port)
|
||||
auto drop_port = [&](OutputPort *& port)
|
||||
{
|
||||
auto null_sink = std::make_shared<NullSink>(totals_having_port->getHeader());
|
||||
connect(*totals_having_port, null_sink->getPort());
|
||||
auto null_sink = std::make_shared<NullSink>(port->getHeader());
|
||||
connect(*port, null_sink->getPort());
|
||||
processors.emplace_back(std::move(null_sink));
|
||||
totals_having_port = nullptr;
|
||||
}
|
||||
port = nullptr;
|
||||
};
|
||||
|
||||
if (totals_having_port)
|
||||
drop_port(totals_having_port);
|
||||
|
||||
if (extremes_port)
|
||||
drop_port(extremes_port);
|
||||
}
|
||||
|
||||
void QueryPipeline::addExtremesTransform()
|
||||
|
@ -109,7 +109,8 @@ public:
|
||||
/// Add already calculated totals.
|
||||
void addTotals(ProcessorPtr source);
|
||||
|
||||
void dropTotalsIfHas();
|
||||
/// Forget about current totals and extremes. It is needed before aggregation, cause they will be calculated again.
|
||||
void dropTotalsAndExtremes();
|
||||
|
||||
/// Will read from this stream after all data was read from other streams.
|
||||
void addDelayedStream(ProcessorPtr source);
|
||||
|
@ -0,0 +1,289 @@
|
||||
20
|
||||
20
|
||||
|
||||
20
|
||||
distributed_group_by_no_merge = 0, extremes = 0
|
||||
10
|
||||
-
|
||||
10
|
||||
-
|
||||
10
|
||||
|
||||
10
|
||||
-
|
||||
10
|
||||
|
||||
10
|
||||
------
|
||||
10
|
||||
-
|
||||
10
|
||||
-
|
||||
20
|
||||
-
|
||||
20
|
||||
------
|
||||
10
|
||||
|
||||
10
|
||||
-
|
||||
10
|
||||
|
||||
10
|
||||
-
|
||||
20
|
||||
|
||||
20
|
||||
-
|
||||
20
|
||||
|
||||
20
|
||||
distributed_group_by_no_merge = 1, extremes = 0
|
||||
10
|
||||
-
|
||||
10
|
||||
-
|
||||
10
|
||||
|
||||
10
|
||||
-
|
||||
10
|
||||
|
||||
10
|
||||
------
|
||||
10
|
||||
-
|
||||
10
|
||||
-
|
||||
20
|
||||
-
|
||||
20
|
||||
------
|
||||
10
|
||||
|
||||
10
|
||||
-
|
||||
10
|
||||
|
||||
10
|
||||
-
|
||||
20
|
||||
|
||||
20
|
||||
-
|
||||
20
|
||||
|
||||
20
|
||||
distributed_group_by_no_merge = 0, extremes = 1
|
||||
10
|
||||
|
||||
10
|
||||
10
|
||||
-
|
||||
|
||||
-
|
||||
-
|
||||
10
|
||||
|
||||
10
|
||||
10
|
||||
-
|
||||
|
||||
-
|
||||
-
|
||||
10
|
||||
|
||||
10
|
||||
|
||||
10
|
||||
10
|
||||
-
|
||||
|
||||
-
|
||||
-
|
||||
10
|
||||
|
||||
10
|
||||
|
||||
10
|
||||
10
|
||||
------
|
||||
|
||||
------
|
||||
------
|
||||
10
|
||||
|
||||
10
|
||||
10
|
||||
-
|
||||
|
||||
-
|
||||
-
|
||||
10
|
||||
|
||||
10
|
||||
10
|
||||
-
|
||||
|
||||
-
|
||||
-
|
||||
20
|
||||
|
||||
20
|
||||
20
|
||||
-
|
||||
|
||||
-
|
||||
-
|
||||
20
|
||||
|
||||
20
|
||||
20
|
||||
------
|
||||
|
||||
------
|
||||
------
|
||||
10
|
||||
|
||||
10
|
||||
|
||||
10
|
||||
10
|
||||
-
|
||||
|
||||
-
|
||||
-
|
||||
10
|
||||
|
||||
10
|
||||
|
||||
10
|
||||
10
|
||||
-
|
||||
|
||||
-
|
||||
-
|
||||
20
|
||||
|
||||
20
|
||||
|
||||
20
|
||||
20
|
||||
-
|
||||
|
||||
-
|
||||
-
|
||||
20
|
||||
|
||||
20
|
||||
|
||||
20
|
||||
20
|
||||
distributed_group_by_no_merge = 1, extremes = 1
|
||||
|
||||
distributed_group_by_no_merge = 1, extremes = 1
|
||||
distributed_group_by_no_merge = 1, extremes = 1
|
||||
10
|
||||
|
||||
10
|
||||
10
|
||||
-
|
||||
|
||||
-
|
||||
-
|
||||
10
|
||||
|
||||
10
|
||||
10
|
||||
-
|
||||
|
||||
-
|
||||
-
|
||||
10
|
||||
|
||||
10
|
||||
|
||||
10
|
||||
10
|
||||
-
|
||||
|
||||
-
|
||||
-
|
||||
10
|
||||
|
||||
10
|
||||
|
||||
10
|
||||
10
|
||||
------
|
||||
|
||||
------
|
||||
------
|
||||
10
|
||||
|
||||
10
|
||||
10
|
||||
-
|
||||
|
||||
-
|
||||
-
|
||||
10
|
||||
|
||||
10
|
||||
10
|
||||
-
|
||||
|
||||
-
|
||||
-
|
||||
20
|
||||
|
||||
20
|
||||
20
|
||||
-
|
||||
|
||||
-
|
||||
-
|
||||
20
|
||||
|
||||
20
|
||||
20
|
||||
------
|
||||
|
||||
------
|
||||
------
|
||||
10
|
||||
|
||||
10
|
||||
|
||||
10
|
||||
10
|
||||
-
|
||||
|
||||
-
|
||||
-
|
||||
10
|
||||
|
||||
10
|
||||
|
||||
10
|
||||
10
|
||||
-
|
||||
|
||||
-
|
||||
-
|
||||
20
|
||||
|
||||
20
|
||||
|
||||
20
|
||||
20
|
||||
-
|
||||
|
||||
-
|
||||
-
|
||||
20
|
||||
|
||||
20
|
||||
|
||||
20
|
||||
20
|
@ -0,0 +1,106 @@
|
||||
SELECT sum(number) FROM (SELECT * FROM remote('127.0.0.{1,2}', system.numbers) LIMIT 5 SETTINGS distributed_group_by_no_merge = 1);
|
||||
SELECT sum(number) FROM (SELECT * FROM remote('127.0.0.{1,2}', system.numbers) LIMIT 5 SETTINGS distributed_group_by_no_merge = 1) with totals;
|
||||
|
||||
SELECT 'distributed_group_by_no_merge = 0, extremes = 0';
|
||||
SET distributed_group_by_no_merge = 0, extremes = 0;
|
||||
SELECT sum(number) FROM (SELECT * FROM remote('127.0.0.1', system.numbers) LIMIT 5);
|
||||
SELECT '-';
|
||||
SELECT sum(number) FROM (SELECT * FROM remote('127.0.0.2', system.numbers) LIMIT 5);
|
||||
SELECT '-';
|
||||
SELECT sum(number) FROM (SELECT * FROM remote('127.0.0.1', system.numbers) LIMIT 5) with totals;
|
||||
SELECT '-';
|
||||
SELECT sum(number) FROM (SELECT * FROM remote('127.0.0.2', system.numbers) LIMIT 5) with totals;
|
||||
SELECT '------';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.1', numbers(5)) with totals);
|
||||
SELECT '-';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.2', numbers(5)) with totals);
|
||||
SELECT '-';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.{1,2}', numbers(5)) with totals);
|
||||
SELECT '-';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.{2,3}', numbers(5)) with totals);
|
||||
SELECT '------';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.1', numbers(5))) with totals;
|
||||
SELECT '-';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.2', numbers(5))) with totals;
|
||||
SELECT '-';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.{1,2}', numbers(5))) with totals;
|
||||
SELECT '-';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.{2,3}', numbers(5))) with totals;
|
||||
|
||||
SELECT 'distributed_group_by_no_merge = 1, extremes = 0';
|
||||
SET distributed_group_by_no_merge = 1, extremes = 0;
|
||||
SELECT sum(number) FROM (SELECT * FROM remote('127.0.0.1', system.numbers) LIMIT 5);
|
||||
SELECT '-';
|
||||
SELECT sum(number) FROM (SELECT * FROM remote('127.0.0.2', system.numbers) LIMIT 5);
|
||||
SELECT '-';
|
||||
SELECT sum(number) FROM (SELECT * FROM remote('127.0.0.1', system.numbers) LIMIT 5) with totals;
|
||||
SELECT '-';
|
||||
SELECT sum(number) FROM (SELECT * FROM remote('127.0.0.2', system.numbers) LIMIT 5) with totals;
|
||||
SELECT '------';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.1', numbers(5)) with totals);
|
||||
SELECT '-';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.2', numbers(5)) with totals);
|
||||
SELECT '-';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.{1,2}', numbers(5)) with totals);
|
||||
SELECT '-';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.{2,3}', numbers(5)) with totals);
|
||||
SELECT '------';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.1', numbers(5))) with totals;
|
||||
SELECT '-';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.2', numbers(5))) with totals;
|
||||
SELECT '-';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.{1,2}', numbers(5))) with totals;
|
||||
SELECT '-';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.{2,3}', numbers(5))) with totals;
|
||||
|
||||
SELECT 'distributed_group_by_no_merge = 0, extremes = 1';
|
||||
SET distributed_group_by_no_merge = 0, extremes = 1;
|
||||
SELECT sum(number) FROM (SELECT * FROM remote('127.0.0.1', system.numbers) LIMIT 5);
|
||||
SELECT '-';
|
||||
SELECT sum(number) FROM (SELECT * FROM remote('127.0.0.2', system.numbers) LIMIT 5);
|
||||
SELECT '-';
|
||||
SELECT sum(number) FROM (SELECT * FROM remote('127.0.0.1', system.numbers) LIMIT 5) with totals;
|
||||
SELECT '-';
|
||||
SELECT sum(number) FROM (SELECT * FROM remote('127.0.0.2', system.numbers) LIMIT 5) with totals;
|
||||
SELECT '------';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.1', numbers(5)) with totals);
|
||||
SELECT '-';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.2', numbers(5)) with totals);
|
||||
SELECT '-';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.{1,2}', numbers(5)) with totals);
|
||||
SELECT '-';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.{2,3}', numbers(5)) with totals);
|
||||
SELECT '------';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.1', numbers(5))) with totals;
|
||||
SELECT '-';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.2', numbers(5))) with totals;
|
||||
SELECT '-';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.{1,2}', numbers(5))) with totals;
|
||||
SELECT '-';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.{2,3}', numbers(5))) with totals;
|
||||
|
||||
SELECT 'distributed_group_by_no_merge = 1, extremes = 1';
|
||||
SET distributed_group_by_no_merge = 1, extremes = 1;
|
||||
SELECT sum(number) FROM (SELECT * FROM remote('127.0.0.1', system.numbers) LIMIT 5);
|
||||
SELECT '-';
|
||||
SELECT sum(number) FROM (SELECT * FROM remote('127.0.0.2', system.numbers) LIMIT 5);
|
||||
SELECT '-';
|
||||
SELECT sum(number) FROM (SELECT * FROM remote('127.0.0.1', system.numbers) LIMIT 5) with totals;
|
||||
SELECT '-';
|
||||
SELECT sum(number) FROM (SELECT * FROM remote('127.0.0.2', system.numbers) LIMIT 5) with totals;
|
||||
SELECT '------';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.1', numbers(5)) with totals);
|
||||
SELECT '-';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.2', numbers(5)) with totals);
|
||||
SELECT '-';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.{1,2}', numbers(5)) with totals);
|
||||
SELECT '-';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.{2,3}', numbers(5)) with totals);
|
||||
SELECT '------';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.1', numbers(5))) with totals;
|
||||
SELECT '-';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.2', numbers(5))) with totals;
|
||||
SELECT '-';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.{1,2}', numbers(5))) with totals;
|
||||
SELECT '-';
|
||||
SELECT sum(s) FROM (SELECT sum(number) as s FROM remote('127.0.0.{2,3}', numbers(5))) with totals;
|
Loading…
Reference in New Issue
Block a user