Try to fix GROUP BY _shard_num

This commit is contained in:
Nikolai Kochetov 2021-05-18 18:17:19 +03:00
parent 64e7994987
commit 57f5e33464
4 changed files with 146 additions and 21 deletions

View File

@ -1041,11 +1041,19 @@ ActionsDAGPtr ActionsDAG::makeConvertingActions(
{
auto & input = inputs[res_elem.name];
if (input.empty())
throw Exception("Cannot find column " + backQuote(res_elem.name) + " in source stream",
ErrorCodes::THERE_IS_NO_COLUMN);
src_node = dst_node = actions_dag->inputs[input.front()];
input.pop_front();
{
const auto * res_const = typeid_cast<const ColumnConst *>(res_elem.column.get());
if (ignore_constant_values && res_const)
src_node = dst_node = &actions_dag->addColumn(res_elem);
else
throw Exception("Cannot find column " + backQuote(res_elem.name) + " in source stream",
ErrorCodes::THERE_IS_NO_COLUMN);
}
else
{
src_node = dst_node = actions_dag->inputs[input.front()];
input.pop_front();
}
break;
}
}

View File

@ -8,11 +8,13 @@
#include <Common/checkStackSize.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <Interpreters/RequiredSourceColumnsVisitor.h>
#include <common/logger_useful.h>
#include <Processors/Pipe.h>
#include <Processors/Sources/RemoteSource.h>
#include <Processors/Sources/DelayedSource.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
@ -73,6 +75,89 @@ SelectStreamFactory::SelectStreamFactory(
namespace
{
/// Special support for the case when `_shard_num` column is used in GROUP BY key expression.
/// This column is constant is a constant for shard.
/// Constant expression with this colum may be removed from intermediate header.
/// However, this column is not constant for initiator, and it expect intermediate header has it.
///
/// To fix it, the follwing trick is applied.
/// We check all GROUP BY keys which depend only on `_shard_num`.
/// Calculate such expression for curren shard if it is used in header.
/// Those columns will be added to modified header as already known constants.
///
/// For local shard, missed constants will be added by converting actions.
/// For remote shard, RemoteQueryExecutor will automatically add missing constant.
Block evaluateConstantGroupByKeysWithShardNumber(
const ContextPtr & context, const ASTPtr & query_ast, const Block & header, UInt32 shard_num)
{
Block res;
if (auto group_by = query_ast->as<ASTSelectQuery &>().groupBy())
{
for (const auto & elem : group_by->children)
{
String key_name = elem->getColumnName();
if (header.has(key_name))
{
auto ast = elem->clone();
RequiredSourceColumnsVisitor::Data columns_context;
RequiredSourceColumnsVisitor(columns_context).visit(ast);
auto required_columns = columns_context.requiredColumns();
if (required_columns.size() != 1 || required_columns.count("_shard_num") == 0)
continue;
auto type = std::make_shared<DataTypeUInt32>();
auto col = type->createColumnConst(1, shard_num);
std::string name = "_shard_num";
Block block({ColumnWithTypeAndName{col, type, name}});
auto syntax_result = TreeRewriter(context).analyze(ast, {NameAndTypePair{name, type}});
ExpressionAnalyzer(ast, syntax_result, context).getActions(true)->execute(block);
res.insert(block.getByName(key_name));
}
}
}
return res;
}
ActionsDAGPtr getConvertingDAG(const Block & block, const Block & header)
{
/// Convert header structure to expected.
/// Also we ignore constants from result and replace it with constants from header.
/// It is needed for functions like `now64()` or `randConstant()` because their values may be different.
return ActionsDAG::makeConvertingActions(
block.getColumnsWithTypeAndName(),
header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name,
true);
}
void addConvertingActions(QueryPlan & plan, const Block & header)
{
if (blocksHaveEqualStructure(plan.getCurrentDataStream().header, header))
return;
auto convert_actions_dag = getConvertingDAG(plan.getCurrentDataStream().header, header);
auto converting = std::make_unique<ExpressionStep>(plan.getCurrentDataStream(), convert_actions_dag);
plan.addStep(std::move(converting));
}
void addConvertingActions(Pipe & pipe, const Block & header)
{
if (blocksHaveEqualStructure(pipe.getHeader(), header))
return;
auto convert_actions = std::make_shared<ExpressionActions>(getConvertingDAG(pipe.getHeader(), header));
pipe.addSimpleTransform([&](const Block & cur_header, Pipe::StreamType) -> ProcessorPtr
{
return std::make_shared<ExpressionTransform>(cur_header, convert_actions);
});
}
std::unique_ptr<QueryPlan> createLocalPlan(
const ASTPtr & query_ast,
const Block & header,
@ -86,18 +171,7 @@ std::unique_ptr<QueryPlan> createLocalPlan(
InterpreterSelectQuery interpreter(query_ast, context, SelectQueryOptions(processed_stage));
interpreter.buildQueryPlan(*query_plan);
/// Convert header structure to expected.
/// Also we ignore constants from result and replace it with constants from header.
/// It is needed for functions like `now64()` or `randConstant()` because their values may be different.
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
query_plan->getCurrentDataStream().header.getColumnsWithTypeAndName(),
header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name,
true);
auto converting = std::make_unique<ExpressionStep>(query_plan->getCurrentDataStream(), convert_actions_dag);
converting->setStepDescription("Convert block structure for query from local replica");
query_plan->addStep(std::move(converting));
addConvertingActions(*query_plan, header);
return query_plan;
}
@ -134,12 +208,25 @@ void SelectStreamFactory::createForShard(
}
auto modified_query_ast = query_ast->clone();
auto modified_header = header;
if (has_virtual_shard_num_column)
{
VirtualColumnUtils::rewriteEntityInAst(modified_query_ast, "_shard_num", shard_info.shard_num, "toUInt32");
auto shard_num_constants = evaluateConstantGroupByKeysWithShardNumber(context, query_ast, modified_header, shard_info.shard_num);
for (auto & col : shard_num_constants)
{
if (modified_header.has(col.name))
modified_header.getByName(col.name).column = std::move(col.column);
else
modified_header.insert(std::move(col));
}
}
auto emplace_local_stream = [&]()
{
plans.emplace_back(createLocalPlan(modified_query_ast, header, context, processed_stage));
plans.emplace_back(createLocalPlan(modified_query_ast, modified_header, context, processed_stage));
addConvertingActions(*plans.back(), header);
};
String modified_query = formattedAST(modified_query_ast);
@ -147,7 +234,7 @@ void SelectStreamFactory::createForShard(
auto emplace_remote_stream = [&]()
{
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard_info.pool, modified_query, header, context, throttler, scalars, external_tables, processed_stage);
shard_info.pool, modified_query, modified_header, context, throttler, scalars, external_tables, processed_stage);
remote_query_executor->setLogger(log);
remote_query_executor->setPoolMode(PoolMode::GET_MANY);
@ -156,6 +243,7 @@ void SelectStreamFactory::createForShard(
remote_pipes.emplace_back(createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read));
remote_pipes.back().addInterpreterContext(context);
addConvertingActions(remote_pipes.back(), header);
};
const auto & settings = context->getSettingsRef();
@ -247,7 +335,7 @@ void SelectStreamFactory::createForShard(
/// Do it lazily to avoid connecting in the main thread.
auto lazily_create_stream = [
pool = shard_info.pool, shard_num = shard_info.shard_num, modified_query, header = header, modified_query_ast,
pool = shard_info.pool, shard_num = shard_info.shard_num, modified_query, header = modified_header, modified_query_ast,
context, throttler,
main_table = main_table, table_func_ptr = table_func_ptr, scalars = scalars, external_tables = external_tables,
stage = processed_stage, local_delay, add_agg_info, add_totals, add_extremes, async_read]()
@ -302,8 +390,9 @@ void SelectStreamFactory::createForShard(
}
};
delayed_pipes.emplace_back(createDelayedPipe(header, lazily_create_stream, add_totals, add_extremes));
delayed_pipes.emplace_back(createDelayedPipe(modified_header, lazily_create_stream, add_totals, add_extremes));
delayed_pipes.back().addInterpreterContext(context);
addConvertingActions(delayed_pipes.back(), header);
}
else
emplace_remote_stream();

View File

@ -0,0 +1,16 @@
1 1
2 1
1 1
2 1
2 1
3 1
2 1
3 1
1 1
2 1
1 1
2 1
1
2
1
2

View File

@ -0,0 +1,12 @@
-- GROUP BY _shard_num
SELECT _shard_num, count() FROM remote('127.0.0.{1,2}', system.one) GROUP BY _shard_num ORDER BY _shard_num;
SELECT _shard_num s, count() FROM remote('127.0.0.{1,2}', system.one) GROUP BY _shard_num ORDER BY _shard_num;
SELECT _shard_num + 1, count() FROM remote('127.0.0.{1,2}', system.one) GROUP BY _shard_num + 1 ORDER BY _shard_num + 1;
SELECT _shard_num + 1 s, count() FROM remote('127.0.0.{1,2}', system.one) GROUP BY _shard_num + 1 ORDER BY _shard_num + 1;
SELECT _shard_num + dummy, count() FROM remote('127.0.0.{1,2}', system.one) GROUP BY _shard_num + dummy ORDER BY _shard_num + dummy;
SELECT _shard_num + dummy s, count() FROM remote('127.0.0.{1,2}', system.one) GROUP BY _shard_num + dummy ORDER BY _shard_num + dummy;
SELECT _shard_num FROM remote('127.0.0.{1,2}', system.one) ORDER BY _shard_num;
SELECT _shard_num s FROM remote('127.0.0.{1,2}', system.one) ORDER BY _shard_num;