mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge pull request #24252 from ClickHouse/try-fix-group-by-shard-num-in-another-way
Try to fix GROUP BY _shard_num in a different way
This commit is contained in:
commit
d09f3bcd0d
@ -1041,11 +1041,19 @@ ActionsDAGPtr ActionsDAG::makeConvertingActions(
|
||||
{
|
||||
auto & input = inputs[res_elem.name];
|
||||
if (input.empty())
|
||||
throw Exception("Cannot find column " + backQuote(res_elem.name) + " in source stream",
|
||||
ErrorCodes::THERE_IS_NO_COLUMN);
|
||||
|
||||
src_node = dst_node = actions_dag->inputs[input.front()];
|
||||
input.pop_front();
|
||||
{
|
||||
const auto * res_const = typeid_cast<const ColumnConst *>(res_elem.column.get());
|
||||
if (ignore_constant_values && res_const)
|
||||
src_node = dst_node = &actions_dag->addColumn(res_elem);
|
||||
else
|
||||
throw Exception("Cannot find column " + backQuote(res_elem.name) + " in source stream",
|
||||
ErrorCodes::THERE_IS_NO_COLUMN);
|
||||
}
|
||||
else
|
||||
{
|
||||
src_node = dst_node = actions_dag->inputs[input.front()];
|
||||
input.pop_front();
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -8,11 +8,13 @@
|
||||
#include <Common/checkStackSize.h>
|
||||
#include <TableFunctions/TableFunctionFactory.h>
|
||||
#include <IO/ConnectionTimeoutsContext.h>
|
||||
#include <Interpreters/RequiredSourceColumnsVisitor.h>
|
||||
|
||||
#include <common/logger_useful.h>
|
||||
#include <Processors/Pipe.h>
|
||||
#include <Processors/Sources/RemoteSource.h>
|
||||
#include <Processors/Sources/DelayedSource.h>
|
||||
#include <Processors/Transforms/ExpressionTransform.h>
|
||||
#include <Processors/QueryPlan/QueryPlan.h>
|
||||
#include <Processors/QueryPlan/ExpressionStep.h>
|
||||
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
|
||||
@ -73,6 +75,95 @@ SelectStreamFactory::SelectStreamFactory(
|
||||
namespace
|
||||
{
|
||||
|
||||
/// Special support for the case when `_shard_num` column is used in GROUP BY key expression.
|
||||
/// This column is a constant for shard.
|
||||
/// Constant expression with this column may be removed from intermediate header.
|
||||
/// However, this column is not constant for initiator, and it expect intermediate header has it.
|
||||
///
|
||||
/// To fix it, the following trick is applied.
|
||||
/// We check all GROUP BY keys which depend only on `_shard_num`.
|
||||
/// Calculate such expression for current shard if it is used in header.
|
||||
/// Those columns will be added to modified header as already known constants.
|
||||
///
|
||||
/// For local shard, missed constants will be added by converting actions.
|
||||
/// For remote shard, RemoteQueryExecutor will automatically add missing constant.
|
||||
Block evaluateConstantGroupByKeysWithShardNumber(
|
||||
const ContextPtr & context, const ASTPtr & query_ast, const Block & header, UInt32 shard_num)
|
||||
{
|
||||
Block res;
|
||||
|
||||
ColumnWithTypeAndName shard_num_col;
|
||||
shard_num_col.type = std::make_shared<DataTypeUInt32>();
|
||||
shard_num_col.column = shard_num_col.type->createColumnConst(0, shard_num);
|
||||
shard_num_col.name = "_shard_num";
|
||||
|
||||
if (auto group_by = query_ast->as<ASTSelectQuery &>().groupBy())
|
||||
{
|
||||
for (const auto & elem : group_by->children)
|
||||
{
|
||||
String key_name = elem->getColumnName();
|
||||
if (header.has(key_name))
|
||||
{
|
||||
auto ast = elem->clone();
|
||||
|
||||
RequiredSourceColumnsVisitor::Data columns_context;
|
||||
RequiredSourceColumnsVisitor(columns_context).visit(ast);
|
||||
|
||||
auto required_columns = columns_context.requiredColumns();
|
||||
if (required_columns.size() != 1 || required_columns.count("_shard_num") == 0)
|
||||
continue;
|
||||
|
||||
Block block({shard_num_col});
|
||||
auto syntax_result = TreeRewriter(context).analyze(ast, {NameAndTypePair{shard_num_col.name, shard_num_col.type}});
|
||||
ExpressionAnalyzer(ast, syntax_result, context).getActions(true, false)->execute(block);
|
||||
|
||||
res.insert(block.getByName(key_name));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// We always add _shard_num constant just in case.
|
||||
/// For initial query it is considered as a column from table, and may be required by intermediate block.
|
||||
if (!res.has(shard_num_col.name))
|
||||
res.insert(std::move(shard_num_col));
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
ActionsDAGPtr getConvertingDAG(const Block & block, const Block & header)
|
||||
{
|
||||
/// Convert header structure to expected.
|
||||
/// Also we ignore constants from result and replace it with constants from header.
|
||||
/// It is needed for functions like `now64()` or `randConstant()` because their values may be different.
|
||||
return ActionsDAG::makeConvertingActions(
|
||||
block.getColumnsWithTypeAndName(),
|
||||
header.getColumnsWithTypeAndName(),
|
||||
ActionsDAG::MatchColumnsMode::Name,
|
||||
true);
|
||||
}
|
||||
|
||||
void addConvertingActions(QueryPlan & plan, const Block & header)
|
||||
{
|
||||
if (blocksHaveEqualStructure(plan.getCurrentDataStream().header, header))
|
||||
return;
|
||||
|
||||
auto convert_actions_dag = getConvertingDAG(plan.getCurrentDataStream().header, header);
|
||||
auto converting = std::make_unique<ExpressionStep>(plan.getCurrentDataStream(), convert_actions_dag);
|
||||
plan.addStep(std::move(converting));
|
||||
}
|
||||
|
||||
void addConvertingActions(Pipe & pipe, const Block & header)
|
||||
{
|
||||
if (blocksHaveEqualStructure(pipe.getHeader(), header))
|
||||
return;
|
||||
|
||||
auto convert_actions = std::make_shared<ExpressionActions>(getConvertingDAG(pipe.getHeader(), header));
|
||||
pipe.addSimpleTransform([&](const Block & cur_header, Pipe::StreamType) -> ProcessorPtr
|
||||
{
|
||||
return std::make_shared<ExpressionTransform>(cur_header, convert_actions);
|
||||
});
|
||||
}
|
||||
|
||||
std::unique_ptr<QueryPlan> createLocalPlan(
|
||||
const ASTPtr & query_ast,
|
||||
const Block & header,
|
||||
@ -86,18 +177,7 @@ std::unique_ptr<QueryPlan> createLocalPlan(
|
||||
InterpreterSelectQuery interpreter(query_ast, context, SelectQueryOptions(processed_stage));
|
||||
interpreter.buildQueryPlan(*query_plan);
|
||||
|
||||
/// Convert header structure to expected.
|
||||
/// Also we ignore constants from result and replace it with constants from header.
|
||||
/// It is needed for functions like `now64()` or `randConstant()` because their values may be different.
|
||||
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
|
||||
query_plan->getCurrentDataStream().header.getColumnsWithTypeAndName(),
|
||||
header.getColumnsWithTypeAndName(),
|
||||
ActionsDAG::MatchColumnsMode::Name,
|
||||
true);
|
||||
|
||||
auto converting = std::make_unique<ExpressionStep>(query_plan->getCurrentDataStream(), convert_actions_dag);
|
||||
converting->setStepDescription("Convert block structure for query from local replica");
|
||||
query_plan->addStep(std::move(converting));
|
||||
addConvertingActions(*query_plan, header);
|
||||
|
||||
return query_plan;
|
||||
}
|
||||
@ -134,12 +214,25 @@ void SelectStreamFactory::createForShard(
|
||||
}
|
||||
|
||||
auto modified_query_ast = query_ast->clone();
|
||||
auto modified_header = header;
|
||||
if (has_virtual_shard_num_column)
|
||||
{
|
||||
VirtualColumnUtils::rewriteEntityInAst(modified_query_ast, "_shard_num", shard_info.shard_num, "toUInt32");
|
||||
auto shard_num_constants = evaluateConstantGroupByKeysWithShardNumber(context, query_ast, modified_header, shard_info.shard_num);
|
||||
|
||||
for (auto & col : shard_num_constants)
|
||||
{
|
||||
if (modified_header.has(col.name))
|
||||
modified_header.getByName(col.name).column = std::move(col.column);
|
||||
else
|
||||
modified_header.insert(std::move(col));
|
||||
}
|
||||
}
|
||||
|
||||
auto emplace_local_stream = [&]()
|
||||
{
|
||||
plans.emplace_back(createLocalPlan(modified_query_ast, header, context, processed_stage));
|
||||
plans.emplace_back(createLocalPlan(modified_query_ast, modified_header, context, processed_stage));
|
||||
addConvertingActions(*plans.back(), header);
|
||||
};
|
||||
|
||||
String modified_query = formattedAST(modified_query_ast);
|
||||
@ -147,7 +240,7 @@ void SelectStreamFactory::createForShard(
|
||||
auto emplace_remote_stream = [&]()
|
||||
{
|
||||
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
|
||||
shard_info.pool, modified_query, header, context, throttler, scalars, external_tables, processed_stage);
|
||||
shard_info.pool, modified_query, modified_header, context, throttler, scalars, external_tables, processed_stage);
|
||||
remote_query_executor->setLogger(log);
|
||||
|
||||
remote_query_executor->setPoolMode(PoolMode::GET_MANY);
|
||||
@ -156,6 +249,7 @@ void SelectStreamFactory::createForShard(
|
||||
|
||||
remote_pipes.emplace_back(createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read));
|
||||
remote_pipes.back().addInterpreterContext(context);
|
||||
addConvertingActions(remote_pipes.back(), header);
|
||||
};
|
||||
|
||||
const auto & settings = context->getSettingsRef();
|
||||
@ -247,7 +341,7 @@ void SelectStreamFactory::createForShard(
|
||||
/// Do it lazily to avoid connecting in the main thread.
|
||||
|
||||
auto lazily_create_stream = [
|
||||
pool = shard_info.pool, shard_num = shard_info.shard_num, modified_query, header = header, modified_query_ast,
|
||||
pool = shard_info.pool, shard_num = shard_info.shard_num, modified_query, header = modified_header, modified_query_ast,
|
||||
context, throttler,
|
||||
main_table = main_table, table_func_ptr = table_func_ptr, scalars = scalars, external_tables = external_tables,
|
||||
stage = processed_stage, local_delay, add_agg_info, add_totals, add_extremes, async_read]()
|
||||
@ -302,8 +396,9 @@ void SelectStreamFactory::createForShard(
|
||||
}
|
||||
};
|
||||
|
||||
delayed_pipes.emplace_back(createDelayedPipe(header, lazily_create_stream, add_totals, add_extremes));
|
||||
delayed_pipes.emplace_back(createDelayedPipe(modified_header, lazily_create_stream, add_totals, add_extremes));
|
||||
delayed_pipes.back().addInterpreterContext(context);
|
||||
addConvertingActions(delayed_pipes.back(), header);
|
||||
}
|
||||
else
|
||||
emplace_remote_stream();
|
||||
|
@ -0,0 +1,18 @@
|
||||
1 1
|
||||
2 1
|
||||
1 1
|
||||
2 1
|
||||
2 1
|
||||
3 1
|
||||
2 1
|
||||
3 1
|
||||
1 1
|
||||
2 1
|
||||
1 1
|
||||
2 1
|
||||
1
|
||||
2
|
||||
1
|
||||
2
|
||||
1 1
|
||||
2 1
|
@ -0,0 +1,16 @@
|
||||
-- GROUP BY _shard_num
|
||||
SELECT _shard_num, count() FROM remote('127.0.0.{1,2}', system.one) GROUP BY _shard_num ORDER BY _shard_num;
|
||||
SELECT _shard_num s, count() FROM remote('127.0.0.{1,2}', system.one) GROUP BY _shard_num ORDER BY _shard_num;
|
||||
|
||||
SELECT _shard_num + 1, count() FROM remote('127.0.0.{1,2}', system.one) GROUP BY _shard_num + 1 ORDER BY _shard_num + 1;
|
||||
SELECT _shard_num + 1 s, count() FROM remote('127.0.0.{1,2}', system.one) GROUP BY _shard_num + 1 ORDER BY _shard_num + 1;
|
||||
|
||||
SELECT _shard_num + dummy, count() FROM remote('127.0.0.{1,2}', system.one) GROUP BY _shard_num + dummy ORDER BY _shard_num + dummy;
|
||||
SELECT _shard_num + dummy s, count() FROM remote('127.0.0.{1,2}', system.one) GROUP BY _shard_num + dummy ORDER BY _shard_num + dummy;
|
||||
|
||||
SELECT _shard_num FROM remote('127.0.0.{1,2}', system.one) ORDER BY _shard_num;
|
||||
SELECT _shard_num s FROM remote('127.0.0.{1,2}', system.one) ORDER BY _shard_num;
|
||||
|
||||
SELECT _shard_num s, count() FROM remote('127.0.0.{1,2}', system.one) GROUP BY s order by s;
|
||||
|
||||
select materialize(_shard_num), * from remote('127.{1,2}', system.one) limit 1 by dummy format Null;
|
Loading…
Reference in New Issue
Block a user