Fix 03031_read_in_order_optimization_with_virtual_row

This commit is contained in:
Nikolai Kochetov 2024-09-26 15:27:57 +00:00
parent 5032b47fb3
commit 7feda9a054
4 changed files with 57 additions and 26 deletions

View File

@ -7,6 +7,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
/// To carry part level and virtual row if chunk is produced by a merge tree source
class MergeTreeReadInfo : public ChunkInfoCloneable<MergeTreeReadInfo>
{
@ -41,33 +46,49 @@ inline bool isVirtualRow(const Chunk & chunk)
return false;
}
inline void setVirtualRow(Chunk & chunk, bool apply_virtual_row_conversions)
inline void setVirtualRow(Chunk & chunk, const Block & header, bool apply_virtual_row_conversions)
{
auto read_info = chunk.getChunkInfos().extract<MergeTreeReadInfo>();
auto read_info = chunk.getChunkInfos().get<MergeTreeReadInfo>();
chassert(read_info);
Block & pk_block = read_info->pk_block;
// std::cerr << apply_virtual_row_conversions << std::endl;
// std::cerr << read_info->virtual_row_conversions->dumpActions() << std::endl;
if (apply_virtual_row_conversions)
read_info->virtual_row_conversions->execute(pk_block);
chunk.setColumns(pk_block.getColumns(), 1);
// std::cerr << "++++" << pk_block.dumpStructure() << std::endl;
// Columns ordered_columns;
// ordered_columns.reserve(pk_block.columns());
Columns ordered_columns;
ordered_columns.reserve(pk_block.columns());
// for (size_t i = 0; i < header.columns(); ++i)
// {
// const ColumnWithTypeAndName & type_and_name = header.getByPosition(i);
// ColumnPtr current_column = type_and_name.type->createColumn();
for (size_t i = 0; i < header.columns(); ++i)
{
const ColumnWithTypeAndName & col = header.getByPosition(i);
if (const auto * pk_col = pk_block.findByName(col.name))
{
if (!col.type->equals(*pk_col->type))
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Virtual row has different tupe for {}. Expected {}, got {}",
col.name, col.dumpStructure(), pk_col->dumpStructure());
// size_t pos = type_and_name.name.find_last_of('.');
// String column_name = (pos == String::npos) ? type_and_name.name : type_and_name.name.substr(pos + 1);
ordered_columns.push_back(pk_col->column);
}
else
ordered_columns.push_back(col.type->createColumnConstWithDefaultValue(1));
// const ColumnWithTypeAndName * column = pk_block.findByName(column_name, true);
// ordered_columns.push_back(column ? column->column : current_column->cloneResized(1));
// }
// ColumnPtr current_column = type_and_name.type->createColumn();
// chunk.setColumns(ordered_columns, 1);
// size_t pos = type_and_name.name.find_last_of('.');
// String column_name = (pos == String::npos) ? type_and_name.name : type_and_name.name.substr(pos + 1);
// const ColumnWithTypeAndName * column = pk_block.findByName(column_name, true);
// ordered_columns.push_back(column ? column->column : current_column->cloneResized(1));
}
chunk.setColumns(ordered_columns, 1);
}
}

View File

@ -62,8 +62,7 @@ void MergingSortedAlgorithm::initialize(Inputs inputs)
if (!isVirtualRow(input.chunk))
continue;
setVirtualRow(input.chunk, apply_virtual_row_conversions);
input.skip_last_row = true;
setVirtualRow(input.chunk, header, apply_virtual_row_conversions);
}
removeConstAndSparse(inputs);

View File

@ -339,20 +339,20 @@ void enrichFixedColumns(const ActionsDAG & dag, FixedColumns & fixed_columns)
}
}
static const ActionsDAG::Node * addMonotonicChain(ActionsDAG & dag, const ActionsDAG::Node * node, const MatchedTrees::Match * match)
static const ActionsDAG::Node * addMonotonicChain(ActionsDAG & dag, const ActionsDAG::Node * node, const MatchedTrees::Match * match, const std::string & input_name)
{
if (!match->monotonicity)
return &dag.addInput(node->result_name, node->result_type);
return &dag.addInput(input_name, node->result_type);
if (node->type == ActionsDAG::ActionType::ALIAS)
return &dag.addAlias(*addMonotonicChain(dag, node->children.front(), match), node->result_name);
return &dag.addAlias(*addMonotonicChain(dag, node->children.front(), match, input_name), node->result_name);
ActionsDAG::NodeRawConstPtrs args;
args.reserve(node->children.size());
for (const auto * child : node->children)
{
if (child == match->monotonicity->child_node)
args.push_back(addMonotonicChain(dag, match->monotonicity->child_node, match->monotonicity->child_match));
args.push_back(addMonotonicChain(dag, match->monotonicity->child_node, match->monotonicity->child_match, input_name));
else
args.push_back(&dag.addColumn({child->column, child->result_type, child->result_name}));
}
@ -571,15 +571,25 @@ SortingInputOrder buildInputOrderInfo(
{
ActionsDAG virtual_row_dag;
virtual_row_dag.getOutputs().reserve(match_infos.size());
size_t next_pk_name = 0;
for (const auto & info : match_infos)
{
const ActionsDAG::Node * output;
if (info.fixed_column)
output = &virtual_row_dag.addColumn({info.fixed_column->column, info.fixed_column->result_type, info.fixed_column->result_name});
else if (info.monotonic)
output = addMonotonicChain(virtual_row_dag, info.source, info.monotonic);
else
output = &virtual_row_dag.addInput(info.source->result_name, info.source->result_type);
{
if (info.monotonic)
output = addMonotonicChain(virtual_row_dag, info.source, info.monotonic, pk_column_names[next_pk_name]);
else
{
output = &virtual_row_dag.addInput(pk_column_names[next_pk_name], info.source->result_type);
if (pk_column_names[next_pk_name] != info.source->result_name)
output = &virtual_row_dag.addAlias(*output, info.source->result_name);
}
++next_pk_name;
}
virtual_row_dag.getOutputs().push_back(output);
}

View File

@ -701,9 +701,10 @@ Pipe ReadFromMergeTree::readInOrder(
size_t mark_range_begin = part_with_ranges.ranges.front().begin;
ColumnsWithTypeAndName pk_columns;
pk_columns.reserve(index->size());
size_t num_columns = virtual_row_conversion->getSampleBlock().columns();
pk_columns.reserve(num_columns);
for (size_t j = 0; j < index->size(); ++j)
for (size_t j = 0; j < num_columns; ++j)
{
auto column = primary_key.data_types[j]->createColumn()->cloneEmpty();
column->insert((*(*index)[j])[mark_range_begin]);