diff --git a/src/Interpreters/ActionsDAG.cpp b/src/Interpreters/ActionsDAG.cpp index 65a676489c7..f4295825d63 100644 --- a/src/Interpreters/ActionsDAG.cpp +++ b/src/Interpreters/ActionsDAG.cpp @@ -1041,11 +1041,19 @@ ActionsDAGPtr ActionsDAG::makeConvertingActions( { auto & input = inputs[res_elem.name]; if (input.empty()) - throw Exception("Cannot find column " + backQuote(res_elem.name) + " in source stream", - ErrorCodes::THERE_IS_NO_COLUMN); - - src_node = dst_node = actions_dag->inputs[input.front()]; - input.pop_front(); + { + const auto * res_const = typeid_cast(res_elem.column.get()); + if (ignore_constant_values && res_const) + src_node = dst_node = &actions_dag->addColumn(res_elem); + else + throw Exception("Cannot find column " + backQuote(res_elem.name) + " in source stream", + ErrorCodes::THERE_IS_NO_COLUMN); + } + else + { + src_node = dst_node = actions_dag->inputs[input.front()]; + input.pop_front(); + } break; } } diff --git a/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp b/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp index 7cb55f32162..d23271a7147 100644 --- a/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp +++ b/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp @@ -8,11 +8,13 @@ #include #include #include +#include #include #include #include #include +#include #include #include #include @@ -73,6 +75,89 @@ SelectStreamFactory::SelectStreamFactory( namespace { +/// Special support for the case when `_shard_num` column is used in GROUP BY key expression. +/// This column is constant is a constant for shard. +/// Constant expression with this colum may be removed from intermediate header. +/// However, this column is not constant for initiator, and it expect intermediate header has it. +/// +/// To fix it, the follwing trick is applied. +/// We check all GROUP BY keys which depend only on `_shard_num`. +/// Calculate such expression for curren shard if it is used in header. +/// Those columns will be added to modified header as already known constants. +/// +/// For local shard, missed constants will be added by converting actions. +/// For remote shard, RemoteQueryExecutor will automatically add missing constant. +Block evaluateConstantGroupByKeysWithShardNumber( + const ContextPtr & context, const ASTPtr & query_ast, const Block & header, UInt32 shard_num) +{ + Block res; + + if (auto group_by = query_ast->as().groupBy()) + { + for (const auto & elem : group_by->children) + { + String key_name = elem->getColumnName(); + if (header.has(key_name)) + { + auto ast = elem->clone(); + + RequiredSourceColumnsVisitor::Data columns_context; + RequiredSourceColumnsVisitor(columns_context).visit(ast); + + auto required_columns = columns_context.requiredColumns(); + if (required_columns.size() != 1 || required_columns.count("_shard_num") == 0) + continue; + + auto type = std::make_shared(); + auto col = type->createColumnConst(1, shard_num); + std::string name = "_shard_num"; + + Block block({ColumnWithTypeAndName{col, type, name}}); + auto syntax_result = TreeRewriter(context).analyze(ast, {NameAndTypePair{name, type}}); + ExpressionAnalyzer(ast, syntax_result, context).getActions(true)->execute(block); + + res.insert(block.getByName(key_name)); + } + } + } + + return res; +} + +ActionsDAGPtr getConvertingDAG(const Block & block, const Block & header) +{ + /// Convert header structure to expected. + /// Also we ignore constants from result and replace it with constants from header. + /// It is needed for functions like `now64()` or `randConstant()` because their values may be different. + return ActionsDAG::makeConvertingActions( + block.getColumnsWithTypeAndName(), + header.getColumnsWithTypeAndName(), + ActionsDAG::MatchColumnsMode::Name, + true); +} + +void addConvertingActions(QueryPlan & plan, const Block & header) +{ + if (blocksHaveEqualStructure(plan.getCurrentDataStream().header, header)) + return; + + auto convert_actions_dag = getConvertingDAG(plan.getCurrentDataStream().header, header); + auto converting = std::make_unique(plan.getCurrentDataStream(), convert_actions_dag); + plan.addStep(std::move(converting)); +} + +void addConvertingActions(Pipe & pipe, const Block & header) +{ + if (blocksHaveEqualStructure(pipe.getHeader(), header)) + return; + + auto convert_actions = std::make_shared(getConvertingDAG(pipe.getHeader(), header)); + pipe.addSimpleTransform([&](const Block & cur_header, Pipe::StreamType) -> ProcessorPtr + { + return std::make_shared(cur_header, convert_actions); + }); +} + std::unique_ptr createLocalPlan( const ASTPtr & query_ast, const Block & header, @@ -86,18 +171,7 @@ std::unique_ptr createLocalPlan( InterpreterSelectQuery interpreter(query_ast, context, SelectQueryOptions(processed_stage)); interpreter.buildQueryPlan(*query_plan); - /// Convert header structure to expected. - /// Also we ignore constants from result and replace it with constants from header. - /// It is needed for functions like `now64()` or `randConstant()` because their values may be different. - auto convert_actions_dag = ActionsDAG::makeConvertingActions( - query_plan->getCurrentDataStream().header.getColumnsWithTypeAndName(), - header.getColumnsWithTypeAndName(), - ActionsDAG::MatchColumnsMode::Name, - true); - - auto converting = std::make_unique(query_plan->getCurrentDataStream(), convert_actions_dag); - converting->setStepDescription("Convert block structure for query from local replica"); - query_plan->addStep(std::move(converting)); + addConvertingActions(*query_plan, header); return query_plan; } @@ -134,12 +208,25 @@ void SelectStreamFactory::createForShard( } auto modified_query_ast = query_ast->clone(); + auto modified_header = header; if (has_virtual_shard_num_column) + { VirtualColumnUtils::rewriteEntityInAst(modified_query_ast, "_shard_num", shard_info.shard_num, "toUInt32"); + auto shard_num_constants = evaluateConstantGroupByKeysWithShardNumber(context, query_ast, modified_header, shard_info.shard_num); + + for (auto & col : shard_num_constants) + { + if (modified_header.has(col.name)) + modified_header.getByName(col.name).column = std::move(col.column); + else + modified_header.insert(std::move(col)); + } + } auto emplace_local_stream = [&]() { - plans.emplace_back(createLocalPlan(modified_query_ast, header, context, processed_stage)); + plans.emplace_back(createLocalPlan(modified_query_ast, modified_header, context, processed_stage)); + addConvertingActions(*plans.back(), header); }; String modified_query = formattedAST(modified_query_ast); @@ -147,7 +234,7 @@ void SelectStreamFactory::createForShard( auto emplace_remote_stream = [&]() { auto remote_query_executor = std::make_shared( - shard_info.pool, modified_query, header, context, throttler, scalars, external_tables, processed_stage); + shard_info.pool, modified_query, modified_header, context, throttler, scalars, external_tables, processed_stage); remote_query_executor->setLogger(log); remote_query_executor->setPoolMode(PoolMode::GET_MANY); @@ -156,6 +243,7 @@ void SelectStreamFactory::createForShard( remote_pipes.emplace_back(createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read)); remote_pipes.back().addInterpreterContext(context); + addConvertingActions(remote_pipes.back(), header); }; const auto & settings = context->getSettingsRef(); @@ -247,7 +335,7 @@ void SelectStreamFactory::createForShard( /// Do it lazily to avoid connecting in the main thread. auto lazily_create_stream = [ - pool = shard_info.pool, shard_num = shard_info.shard_num, modified_query, header = header, modified_query_ast, + pool = shard_info.pool, shard_num = shard_info.shard_num, modified_query, header = modified_header, modified_query_ast, context, throttler, main_table = main_table, table_func_ptr = table_func_ptr, scalars = scalars, external_tables = external_tables, stage = processed_stage, local_delay, add_agg_info, add_totals, add_extremes, async_read]() @@ -302,8 +390,9 @@ void SelectStreamFactory::createForShard( } }; - delayed_pipes.emplace_back(createDelayedPipe(header, lazily_create_stream, add_totals, add_extremes)); + delayed_pipes.emplace_back(createDelayedPipe(modified_header, lazily_create_stream, add_totals, add_extremes)); delayed_pipes.back().addInterpreterContext(context); + addConvertingActions(delayed_pipes.back(), header); } else emplace_remote_stream(); diff --git a/tests/queries/0_stateless/01860_Distributed__shard_num_GROUP_BY.reference b/tests/queries/0_stateless/01860_Distributed__shard_num_GROUP_BY.reference new file mode 100644 index 00000000000..decc92f3ac0 --- /dev/null +++ b/tests/queries/0_stateless/01860_Distributed__shard_num_GROUP_BY.reference @@ -0,0 +1,16 @@ +1 1 +2 1 +1 1 +2 1 +2 1 +3 1 +2 1 +3 1 +1 1 +2 1 +1 1 +2 1 +1 +2 +1 +2 diff --git a/tests/queries/0_stateless/01860_Distributed__shard_num_GROUP_BY.sql b/tests/queries/0_stateless/01860_Distributed__shard_num_GROUP_BY.sql new file mode 100644 index 00000000000..05b82ce4064 --- /dev/null +++ b/tests/queries/0_stateless/01860_Distributed__shard_num_GROUP_BY.sql @@ -0,0 +1,12 @@ +-- GROUP BY _shard_num +SELECT _shard_num, count() FROM remote('127.0.0.{1,2}', system.one) GROUP BY _shard_num ORDER BY _shard_num; +SELECT _shard_num s, count() FROM remote('127.0.0.{1,2}', system.one) GROUP BY _shard_num ORDER BY _shard_num; + +SELECT _shard_num + 1, count() FROM remote('127.0.0.{1,2}', system.one) GROUP BY _shard_num + 1 ORDER BY _shard_num + 1; +SELECT _shard_num + 1 s, count() FROM remote('127.0.0.{1,2}', system.one) GROUP BY _shard_num + 1 ORDER BY _shard_num + 1; + +SELECT _shard_num + dummy, count() FROM remote('127.0.0.{1,2}', system.one) GROUP BY _shard_num + dummy ORDER BY _shard_num + dummy; +SELECT _shard_num + dummy s, count() FROM remote('127.0.0.{1,2}', system.one) GROUP BY _shard_num + dummy ORDER BY _shard_num + dummy; + +SELECT _shard_num FROM remote('127.0.0.{1,2}', system.one) ORDER BY _shard_num; +SELECT _shard_num s FROM remote('127.0.0.{1,2}', system.one) ORDER BY _shard_num;