Introduce virtual row conversions.

This commit is contained in:
Nikolai Kochetov 2024-09-26 12:20:51 +00:00
parent 10ed5a8521
commit bf591fa12b
17 changed files with 226 additions and 125 deletions

View File

@ -2,6 +2,7 @@
#include <Processors/Chunk.h>
#include <Core/Block.h>
#include <Interpreters/ExpressionActions.h>
namespace DB
{
@ -13,14 +14,15 @@ public:
MergeTreeReadInfo() = delete;
explicit MergeTreeReadInfo(size_t part_level) :
origin_merge_tree_part_level(part_level) {}
explicit MergeTreeReadInfo(size_t part_level, const Block & pk_block_) :
origin_merge_tree_part_level(part_level), pk_block(pk_block_) {}
explicit MergeTreeReadInfo(size_t part_level, const Block & pk_block_, ExpressionActionsPtr virtual_row_conversions_) :
origin_merge_tree_part_level(part_level), pk_block(pk_block_), virtual_row_conversions(std::move(virtual_row_conversions_)) {}
MergeTreeReadInfo(const MergeTreeReadInfo & other) = default;
size_t origin_merge_tree_part_level = 0;
/// If is virtual_row, block should not be empty.
Block pk_block;
ExpressionActionsPtr virtual_row_conversions;
};
inline size_t getPartLevelFromChunk(const Chunk & chunk)
@ -39,29 +41,33 @@ inline bool isVirtualRow(const Chunk & chunk)
return false;
}
inline void setVirtualRow(Chunk & chunk, const Block & header)
inline void setVirtualRow(Chunk & chunk, bool apply_virtual_row_conversions)
{
const auto read_info = chunk.getChunkInfos().get<MergeTreeReadInfo>();
auto read_info = chunk.getChunkInfos().extract<MergeTreeReadInfo>();
chassert(read_info);
const Block & pk_block = read_info->pk_block;
Block & pk_block = read_info->pk_block;
if (apply_virtual_row_conversions)
read_info->virtual_row_conversions->execute(pk_block);
Columns ordered_columns;
ordered_columns.reserve(header.columns());
chunk.setColumns(pk_block.getColumns(), 1);
for (size_t i = 0; i < header.columns(); ++i)
{
const ColumnWithTypeAndName & type_and_name = header.getByPosition(i);
ColumnPtr current_column = type_and_name.type->createColumn();
// Columns ordered_columns;
// ordered_columns.reserve(pk_block.columns());
size_t pos = type_and_name.name.find_last_of('.');
String column_name = (pos == String::npos) ? type_and_name.name : type_and_name.name.substr(pos + 1);
// for (size_t i = 0; i < header.columns(); ++i)
// {
// const ColumnWithTypeAndName & type_and_name = header.getByPosition(i);
// ColumnPtr current_column = type_and_name.type->createColumn();
const ColumnWithTypeAndName * column = pk_block.findByName(column_name, true);
ordered_columns.push_back(column ? column->column : current_column->cloneResized(1));
}
// size_t pos = type_and_name.name.find_last_of('.');
// String column_name = (pos == String::npos) ? type_and_name.name : type_and_name.name.substr(pos + 1);
chunk.setColumns(ordered_columns, 1);
// const ColumnWithTypeAndName * column = pk_block.findByName(column_name, true);
// ordered_columns.push_back(column ? column->column : current_column->cloneResized(1));
// }
// chunk.setColumns(ordered_columns, 1);
}
}

View File

@ -22,12 +22,14 @@ MergingSortedAlgorithm::MergingSortedAlgorithm(
SortingQueueStrategy sorting_queue_strategy_,
UInt64 limit_,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes)
bool use_average_block_sizes,
bool apply_virtual_row_conversions_)
: header(std::move(header_))
, merged_data(use_average_block_sizes, max_block_size_, max_block_size_bytes_)
, description(description_)
, limit(limit_)
, out_row_sources_buf(out_row_sources_buf_)
, apply_virtual_row_conversions(apply_virtual_row_conversions_)
, current_inputs(num_inputs)
, sorting_queue_strategy(sorting_queue_strategy_)
, cursors(num_inputs)
@ -60,7 +62,8 @@ void MergingSortedAlgorithm::initialize(Inputs inputs)
if (!isVirtualRow(input.chunk))
continue;
setVirtualRow(input.chunk, header);
setVirtualRow(input.chunk, apply_virtual_row_conversions);
input.skip_last_row = true;
}
removeConstAndSparse(inputs);

View File

@ -22,7 +22,8 @@ public:
SortingQueueStrategy sorting_queue_strategy_,
UInt64 limit_ = 0,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false);
bool use_average_block_sizes = false,
bool apply_virtual_row_conversions_ = true);
void addInput();
@ -47,6 +48,8 @@ private:
/// If it is not nullptr then it should be populated during execution
WriteBuffer * out_row_sources_buf = nullptr;
bool apply_virtual_row_conversions;
/// Chunks currently being merged.
Inputs current_inputs;

View File

@ -22,6 +22,7 @@ MergingSortedTransform::MergingSortedTransform(
bool always_read_till_end_,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes,
bool apply_virtual_row_conversions,
bool have_all_inputs_)
: IMergingTransform(
num_inputs,
@ -38,7 +39,8 @@ MergingSortedTransform::MergingSortedTransform(
sorting_queue_strategy,
limit_,
out_row_sources_buf_,
use_average_block_sizes)
use_average_block_sizes,
apply_virtual_row_conversions)
{
}

View File

@ -22,6 +22,7 @@ public:
bool always_read_till_end_ = false,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false,
bool apply_virtual_row_conversions = true,
bool have_all_inputs_ = true);
String getName() const override { return "MergingSortedTransform"; }

View File

@ -210,6 +210,8 @@ MatchedTrees::Matches matchTrees(const ActionsDAG::NodeRawConstPtrs & inner_dag,
MatchedTrees::Monotonicity monotonicity;
monotonicity.direction *= info.is_positive ? 1 : -1;
monotonicity.strict = info.is_strict;
monotonicity.child_match = &child_match;
monotonicity.child_node = monotonic_child;
if (child_match.monotonicity)
{

View File

@ -22,12 +22,16 @@ namespace DB
/// DAG for PK does not contain aliases and ambiguous nodes.
struct MatchedTrees
{
struct Match;
/// Monotonicity is calculated for monotonic functions chain.
/// Chain is not strict if there is any non-strict monotonic function.
struct Monotonicity
{
int direction = 1;
bool strict = true;
const Match * child_match = nullptr;
const ActionsDAG::Node * child_node = nullptr;
};
struct Match

View File

@ -129,7 +129,7 @@ size_t tryDistinctReadInOrder(QueryPlan::Node * parent_node)
/// update input order info in read_from_merge_tree step
const int direction = 0; /// for DISTINCT direction doesn't matter, ReadFromMergeTree will choose proper one
bool can_read = read_from_merge_tree->requestReadingInOrder(number_of_sorted_distinct_columns, direction, pre_distinct->getLimitHint());
bool can_read = read_from_merge_tree->requestReadingInOrder(number_of_sorted_distinct_columns, direction, pre_distinct->getLimitHint(), {});
if (!can_read)
return 0;

View File

@ -94,17 +94,6 @@ static QueryPlan::Node * findReadingStep(QueryPlan::Node & node, StepStack & bac
return nullptr;
}
static bool checkVirtualRowSupport(const StepStack & backward_path)
{
for (size_t i = 0; i < backward_path.size() - 1; i++)
{
IQueryPlanStep * step = backward_path[i];
if (!typeid_cast<ExpressionStep *>(step) && !typeid_cast<FilterStep *>(step))
return false;
}
return true;
}
void updateStepsDataStreams(StepStack & steps_to_update)
{
/// update data stream's sorting properties for found transforms
@ -338,11 +327,42 @@ void enrichFixedColumns(const ActionsDAG & dag, FixedColumns & fixed_columns)
}
}
InputOrderInfoPtr buildInputOrderInfo(
static const ActionsDAG::Node * addMonotonicChain(ActionsDAG & dag, const ActionsDAG::Node * node, const MatchedTrees::Match * match)
{
if (!match->monotonicity)
return &dag.addInput(node->result_name, node->result_type);
if (node->type == ActionsDAG::ActionType::ALIAS)
return &dag.addAlias(*addMonotonicChain(dag, node->children.front(), match), node->result_name);
ActionsDAG::NodeRawConstPtrs args;
args.reserve(node->children.size());
for (const auto * child : node->children)
{
if (child == match->monotonicity->child_node)
args.push_back(addMonotonicChain(dag, match->monotonicity->child_node, match->monotonicity->child_match));
else
args.push_back(&dag.addColumn({child->column, child->result_type, child->result_name}));
}
return &dag.addFunction(node->function_base, std::move(args), {});
}
struct SortingInputOrder
{
InputOrderInfoPtr input_order{};
/// This is needed for virtual row optimization.
/// Convert the PR values to ORDER BY key.
/// If empty, the optimization cannot be applied.
std::optional<ActionsDAG> virtual_row_conversion{};
};
SortingInputOrder buildInputOrderInfo(
const FixedColumns & fixed_columns,
const std::optional<ActionsDAG> & dag,
const SortDescription & description,
const KeyDescription & sorting_key,
const Names & pk_column_names,
size_t limit)
{
//std::cerr << "------- buildInputOrderInfo " << std::endl;
@ -381,7 +401,18 @@ InputOrderInfoPtr buildInputOrderInfo(
int read_direction = 0;
size_t next_description_column = 0;
size_t next_sort_key = 0;
bool first_prefix_fixed = false;
bool can_optimize_virtual_row = true;
struct MatchInfo
{
const ActionsDAG::Node * source = nullptr;
const ActionsDAG::Node * fixed_column = nullptr;
const MatchedTrees::Match * monotonic = nullptr;
};
std::vector<MatchInfo> match_infos;
match_infos.reserve(description.size());
while (next_description_column < description.size() && next_sort_key < sorting_key.column_names.size())
{
@ -424,6 +455,7 @@ InputOrderInfoPtr buildInputOrderInfo(
//std::cerr << "====== (no dag) Found direct match" << std::endl;
match_infos.push_back({.source = sort_column_node});
++next_description_column;
++next_sort_key;
}
@ -452,27 +484,46 @@ InputOrderInfoPtr buildInputOrderInfo(
{
current_direction *= match.monotonicity->direction;
strict_monotonic = match.monotonicity->strict;
match_infos.push_back({.source = sort_node, .monotonic = &match});
}
else
match_infos.push_back({.source = sort_node});
++next_description_column;
++next_sort_key;
}
else if (fixed_key_columns.contains(sort_column_node))
{
if (next_sort_key == 0)
first_prefix_fixed = true;
{
// Disable virtual row optimization.
// For example, when pk is (a,b), a = 1, order by b, virtual row should be
// disabled in the following case:
// 1st part (0, 100), (1, 2), (1, 3), (1, 4)
// 2nd part (0, 100), (1, 2), (1, 3), (1, 4).
can_optimize_virtual_row = true;
}
//std::cerr << "+++++++++ Found fixed key by match" << std::endl;
++next_sort_key;
}
else
{
//std::cerr << "====== Check for fixed const : " << bool(sort_node->column) << " fixed : " << fixed_columns.contains(sort_node) << std::endl;
bool is_fixed_column = sort_node->column || fixed_columns.contains(sort_node);
if (!is_fixed_column)
break;
if (!sort_node->column)
/// Virtual row for fixed column from order by is not supported now.
/// TODO: we can do it for the simple case,
/// But it's better to remove fixed columns from ORDER BY completely, e.g:
/// WHERE x = 42 ORDER BY x, y => WHERE x = 42 ORDER BY y
can_optimize_virtual_row = false;
match_infos.push_back({.source = sort_node, .fixed_column = sort_node});
order_key_prefix_descr.push_back(sort_column_description);
++next_description_column;
}
@ -494,9 +545,36 @@ InputOrderInfoPtr buildInputOrderInfo(
}
if (read_direction == 0 || order_key_prefix_descr.empty())
return nullptr;
return {};
return std::make_shared<InputOrderInfo>(order_key_prefix_descr, next_sort_key, read_direction, limit, first_prefix_fixed);
/// If the prefix description is used, we can't restore the full description from PK value.
/// TODO: partial sort description can be used as well. Implement support later.
if (order_key_prefix_descr.size() < description.size() || pk_column_names.size() < next_sort_key)
can_optimize_virtual_row = false;
auto order_info = std::make_shared<InputOrderInfo>(order_key_prefix_descr, next_sort_key, read_direction, limit);
std::optional<ActionsDAG> virtual_row_conversion;
if (can_optimize_virtual_row)
{
ActionsDAG virtual_row_dag;
virtual_row_dag.getOutputs().reserve(match_infos.size());
for (const auto & info : match_infos)
{
const ActionsDAG::Node * output;
if (info.fixed_column)
output = &virtual_row_dag.addColumn({info.fixed_column->column, info.fixed_column->result_type, info.fixed_column->result_name});
else if (info.monotonic)
output = addMonotonicChain(virtual_row_dag, info.source, info.monotonic);
else
output = &virtual_row_dag.addInput(info.source->result_name, info.source->result_type);
virtual_row_dag.getOutputs().push_back(output);
}
virtual_row_conversion = std::move(virtual_row_dag);
}
return {std::move(order_info), std::move(virtual_row_conversion)};
}
/// We really need three different sort descriptions here.
@ -700,11 +778,11 @@ AggregationInputOrder buildInputOrderInfo(
for (const auto & key : not_matched_group_by_keys)
group_by_sort_description.emplace_back(SortColumnDescription(std::string(key)));
auto input_order = std::make_shared<InputOrderInfo>(order_key_prefix_descr, next_sort_key, /*read_direction*/ 1, /* limit */ 0, false);
auto input_order = std::make_shared<InputOrderInfo>(order_key_prefix_descr, next_sort_key, /*read_direction*/ 1, /* limit */ 0);
return { std::move(input_order), std::move(sort_description_for_merging), std::move(group_by_sort_description) };
}
InputOrderInfoPtr buildInputOrderInfo(
SortingInputOrder buildInputOrderInfo(
const ReadFromMergeTree * reading,
const FixedColumns & fixed_columns,
const std::optional<ActionsDAG> & dag,
@ -712,15 +790,17 @@ InputOrderInfoPtr buildInputOrderInfo(
size_t limit)
{
const auto & sorting_key = reading->getStorageMetadata()->getSortingKey();
const auto & pk_column_names = reading->getStorageMetadata()->getPrimaryKey().column_names;
return buildInputOrderInfo(
fixed_columns,
dag, description,
sorting_key,
pk_column_names,
limit);
}
InputOrderInfoPtr buildInputOrderInfo(
SortingInputOrder buildInputOrderInfo(
ReadFromMerge * merge,
const FixedColumns & fixed_columns,
const std::optional<ActionsDAG> & dag,
@ -729,28 +809,31 @@ InputOrderInfoPtr buildInputOrderInfo(
{
const auto & tables = merge->getSelectedTables();
InputOrderInfoPtr order_info;
SortingInputOrder order_info;
for (const auto & table : tables)
{
auto storage = std::get<StoragePtr>(table);
const auto & sorting_key = storage->getInMemoryMetadataPtr()->getSortingKey();
auto metadata = storage->getInMemoryMetadataPtr();
const auto & sorting_key = metadata->getSortingKey();
// const auto & pk_column_names = metadata->getPrimaryKey().column_names;
if (sorting_key.column_names.empty())
return nullptr;
return {};
auto table_order_info = buildInputOrderInfo(
fixed_columns,
dag, description,
sorting_key,
{},
limit);
if (!table_order_info)
return nullptr;
if (!table_order_info.input_order)
return {};
if (!order_info)
order_info = table_order_info;
else if (*order_info != *table_order_info)
return nullptr;
if (!order_info.input_order)
order_info = std::move(table_order_info);
else if (*order_info.input_order != *table_order_info.input_order)
return {};
}
return order_info;
@ -830,19 +913,19 @@ InputOrderInfoPtr buildInputOrderInfo(SortingStep & sorting, QueryPlan::Node & n
dag, description,
limit);
if (order_info)
if (order_info.input_order)
{
bool can_read = reading->requestReadingInOrder(order_info->used_prefix_of_sorting_key_size, order_info->direction, order_info->limit);
bool can_read = reading->requestReadingInOrder(
order_info.input_order->used_prefix_of_sorting_key_size,
order_info.input_order->direction,
order_info.input_order->limit,
std::move(order_info.virtual_row_conversion));
if (!can_read)
return nullptr;
if (!checkVirtualRowSupport(backward_path))
reading->setVirtualRowStatus(ReadFromMergeTree::VirtualRowStatus::No);
else if (!order_info->first_prefix_fixed)
reading->setVirtualRowStatus(ReadFromMergeTree::VirtualRowStatus::Possible);
}
return order_info;
return order_info.input_order;
}
else if (auto * merge = typeid_cast<ReadFromMerge *>(reading_node->step.get()))
{
@ -852,14 +935,14 @@ InputOrderInfoPtr buildInputOrderInfo(SortingStep & sorting, QueryPlan::Node & n
dag, description,
limit);
if (order_info)
if (order_info.input_order)
{
bool can_read = merge->requestReadingInOrder(order_info);
bool can_read = merge->requestReadingInOrder(order_info.input_order);
if (!can_read)
return nullptr;
}
return order_info;
return order_info.input_order;
}
return nullptr;
@ -893,7 +976,8 @@ AggregationInputOrder buildInputOrderInfo(AggregatingStep & aggregating, QueryPl
bool can_read = reading->requestReadingInOrder(
order_info.input_order->used_prefix_of_sorting_key_size,
order_info.input_order->direction,
order_info.input_order->limit);
order_info.input_order->limit,
{});
if (!can_read)
return {};
}
@ -1139,7 +1223,7 @@ size_t tryReuseStorageOrderingForWindowFunctions(QueryPlan::Node * parent_node,
if (order_info)
{
bool can_read = read_from_merge_tree->requestReadingInOrder(order_info->used_prefix_of_sorting_key_size, order_info->direction, order_info->limit);
bool can_read = read_from_merge_tree->requestReadingInOrder(order_info->used_prefix_of_sorting_key_size, order_info->direction, order_info->limit, {});
if (!can_read)
return 0;
sorting->convertToFinishSorting(order_info->sort_description_for_merging, false);

View File

@ -549,8 +549,7 @@ Pipe ReadFromMergeTree::readInOrder(
Names required_columns,
PoolSettings pool_settings,
ReadType read_type,
UInt64 read_limit,
bool enable_current_virtual_row)
UInt64 read_limit)
{
/// For reading in order it makes sense to read only
/// one range per task to reduce number of read rows.
@ -661,7 +660,7 @@ Pipe ReadFromMergeTree::readInOrder(
Pipe pipe(source);
if (enable_current_virtual_row && (read_type == ReadType::InOrder))
if (virtual_row_conversion && (read_type == ReadType::InOrder))
{
const auto & index = part_with_ranges.data_part->getIndex();
const auto & primary_key = storage_snapshot->metadata->primary_key;
@ -681,7 +680,7 @@ Pipe ReadFromMergeTree::readInOrder(
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<VirtualRowTransform>(header, pk_block);
return std::make_shared<VirtualRowTransform>(header, pk_block, virtual_row_conversion);
});
}
@ -729,7 +728,7 @@ Pipe ReadFromMergeTree::read(
if (read_type == ReadType::Default && (max_streams > 1 || checkAllPartsOnRemoteFS(parts_with_range)))
return readFromPool(std::move(parts_with_range), std::move(required_columns), std::move(pool_settings));
auto pipe = readInOrder(parts_with_range, required_columns, pool_settings, read_type, /*limit=*/ 0, false);
auto pipe = readInOrder(parts_with_range, required_columns, pool_settings, read_type, /*limit=*/ 0);
/// Use ConcatProcessor to concat sources together.
/// It is needed to read in parts order (and so in PK order) if single thread is used.
@ -1038,7 +1037,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
/// For parallel replicas the split will be performed on the initiator side.
if (is_parallel_reading_from_replicas)
{
pipes.emplace_back(readInOrder(std::move(parts_with_ranges), column_names, pool_settings, read_type, input_order_info->limit, false));
pipes.emplace_back(readInOrder(std::move(parts_with_ranges), column_names, pool_settings, read_type, input_order_info->limit));
}
else
{
@ -1111,33 +1110,32 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
splitted_parts_and_ranges.emplace_back(std::move(new_parts));
}
bool primary_key_type_supports_virtual_row = true;
const auto & actions = storage_snapshot->metadata->getPrimaryKey().expression->getActions();
for (const auto & action : actions)
{
if (action.node->type != ActionsDAG::ActionType::INPUT)
{
primary_key_type_supports_virtual_row = false;
break;
}
}
// bool primary_key_type_supports_virtual_row = true;
// const auto & actions = storage_snapshot->metadata->getPrimaryKey().expression->getActions();
// for (const auto & action : actions)
// {
// if (action.node->type != ActionsDAG::ActionType::INPUT)
// {
// primary_key_type_supports_virtual_row = false;
// break;
// }
// }
/// If possible in the optimization stage, check whether there are more than one branch.
if (virtual_row_status == VirtualRowStatus::Possible)
virtual_row_status = splitted_parts_and_ranges.size() > 1
|| (splitted_parts_and_ranges.size() == 1 && splitted_parts_and_ranges[0].size() > 1)
? VirtualRowStatus::Yes : VirtualRowStatus::NoConsiderInLogicalPlan;
// /// If possible in the optimization stage, check whether there are more than one branch.
// if (virtual_row_status == VirtualRowStatus::Possible)
// virtual_row_status = splitted_parts_and_ranges.size() > 1
// || (splitted_parts_and_ranges.size() == 1 && splitted_parts_and_ranges[0].size() > 1)
// ? VirtualRowStatus::Yes : VirtualRowStatus::NoConsiderInLogicalPlan;
for (auto && item : splitted_parts_and_ranges)
{
bool enable_current_virtual_row = false;
if (virtual_row_status == VirtualRowStatus::Yes)
enable_current_virtual_row = true;
else if (virtual_row_status == VirtualRowStatus::NoConsiderInLogicalPlan)
enable_current_virtual_row = (need_preliminary_merge || output_each_partition_through_separate_port) && item.size() > 1;
// bool enable_current_virtual_row = false;
// if (virtual_row_status == VirtualRowStatus::Yes)
// enable_current_virtual_row = true;
// else if (virtual_row_status == VirtualRowStatus::NoConsiderInLogicalPlan)
// enable_current_virtual_row = (need_preliminary_merge || output_each_partition_through_separate_port) && item.size() > 1;
pipes.emplace_back(readInOrder(std::move(item), column_names, pool_settings, read_type, input_order_info->limit,
enable_current_virtual_row && primary_key_type_supports_virtual_row));
pipes.emplace_back(readInOrder(std::move(item), column_names, pool_settings, read_type, input_order_info->limit));
}
}
@ -1172,7 +1170,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
if (pipe.numOutputPorts() > 1)
{
auto transform = std::make_shared<MergingSortedTransform>(
pipe.getHeader(), pipe.numOutputPorts(), sort_description, block_size.max_block_size_rows, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch);
pipe.getHeader(), pipe.numOutputPorts(), sort_description, block_size.max_block_size_rows, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch,
0, false, nullptr, false, /*apply_virtual_row_conversions*/ false);
pipe.addTransform(std::move(transform));
}
@ -1811,7 +1810,7 @@ ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
return std::make_shared<AnalysisResult>(std::move(result));
}
bool ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction, size_t read_limit)
bool ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction, size_t read_limit, std::optional<ActionsDAG> virtual_row_conversion_)
{
/// if dirction is not set, use current one
if (!direction)
@ -1822,7 +1821,7 @@ bool ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction,
if (direction != 1 && query_info.isFinal())
return false;
query_info.input_order_info = std::make_shared<InputOrderInfo>(SortDescription{}, prefix_size, direction, read_limit, false);
query_info.input_order_info = std::make_shared<InputOrderInfo>(SortDescription{}, prefix_size, direction, read_limit);
reader_settings.read_in_order = true;
/// In case or read-in-order, don't create too many reading streams.
@ -1855,6 +1854,9 @@ bool ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction,
/// Let prefer in-order optimization over vertical FINAL for now
enable_vertical_final = false;
if (virtual_row_conversion_)
virtual_row_conversion = std::make_shared<ExpressionActions>(std::move(*virtual_row_conversion_));
return true;
}
@ -2305,6 +2307,12 @@ void ReadFromMergeTree::describeActions(FormatSettings & format_settings) const
expression->describeActions(format_settings.out, prefix);
}
}
if (virtual_row_conversion)
{
format_settings.out << prefix << "Virtual row conversions" << '\n';
virtual_row_conversion->describeActions(format_settings.out, prefix);
}
}
void ReadFromMergeTree::describeActions(JSONBuilder::JSONMap & map) const
@ -2344,6 +2352,9 @@ void ReadFromMergeTree::describeActions(JSONBuilder::JSONMap & map) const
map.add("Prewhere info", std::move(prewhere_info_map));
}
if (virtual_row_conversion)
map.add("Virtual row conversions", virtual_row_conversion->toTree());
}
void ReadFromMergeTree::describeIndexes(FormatSettings & format_settings) const

View File

@ -108,14 +108,6 @@ public:
using AnalysisResultPtr = std::shared_ptr<AnalysisResult>;
enum class VirtualRowStatus
{
NoConsiderInLogicalPlan,
Possible,
No,
Yes,
};
ReadFromMergeTree(
MergeTreeData::DataPartsVector parts_,
MergeTreeData::MutationsSnapshotPtr mutations_snapshot_,
@ -195,7 +187,7 @@ public:
StorageMetadataPtr getStorageMetadata() const { return storage_snapshot->metadata; }
/// Returns `false` if requested reading cannot be performed.
bool requestReadingInOrder(size_t prefix_size, int direction, size_t limit);
bool requestReadingInOrder(size_t prefix_size, int direction, size_t limit, std::optional<ActionsDAG> virtual_row_conversion_);
bool readsInOrder() const;
void updatePrewhereInfo(const PrewhereInfoPtr & prewhere_info_value) override;
@ -218,8 +210,6 @@ public:
void applyFilters(ActionDAGNodes added_filter_nodes) override;
void setVirtualRowStatus(VirtualRowStatus virtual_row_status_) { virtual_row_status = virtual_row_status_; }
private:
int getSortDirection() const
{
@ -262,7 +252,7 @@ private:
Pipe read(RangesInDataParts parts_with_range, Names required_columns, ReadType read_type, size_t max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache);
Pipe readFromPool(RangesInDataParts parts_with_range, Names required_columns, PoolSettings pool_settings);
Pipe readFromPoolParallelReplicas(RangesInDataParts parts_with_range, Names required_columns, PoolSettings pool_settings);
Pipe readInOrder(RangesInDataParts parts_with_ranges, Names required_columns, PoolSettings pool_settings, ReadType read_type, UInt64 limit, bool enable_current_virtual_row);
Pipe readInOrder(RangesInDataParts parts_with_ranges, Names required_columns, PoolSettings pool_settings, ReadType read_type, UInt64 limit);
Pipe spreadMarkRanges(RangesInDataParts && parts_with_ranges, size_t num_streams, AnalysisResult & result, std::optional<ActionsDAG> & result_projection);
@ -293,7 +283,7 @@ private:
bool enable_vertical_final = false;
bool enable_remove_parts_from_snapshot_optimization = true;
VirtualRowStatus virtual_row_status = VirtualRowStatus::NoConsiderInLogicalPlan;
ExpressionActionsPtr virtual_row_conversion;
std::optional<size_t> number_of_current_replica;
};

View File

@ -187,6 +187,7 @@ void MergeSortingTransform::consume(Chunk chunk)
{
bool have_all_inputs = false;
bool use_average_block_sizes = false;
bool apply_virtual_row = false;
external_merging_sorted = std::make_shared<MergingSortedTransform>(
header_without_constants,
@ -199,6 +200,7 @@ void MergeSortingTransform::consume(Chunk chunk)
/*always_read_till_end_=*/ false,
nullptr,
use_average_block_sizes,
apply_virtual_row,
have_all_inputs);
processors.emplace_back(external_merging_sorted);

View File

@ -9,10 +9,11 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
VirtualRowTransform::VirtualRowTransform(const Block & header_, const Block & pk_block_)
VirtualRowTransform::VirtualRowTransform(const Block & header_, const Block & pk_block_, ExpressionActionsPtr virtual_row_conversions_)
: IProcessor({header_}, {header_})
, input(inputs.front()), output(outputs.front())
, header(header_), pk_block(pk_block_)
, pk_block(pk_block_)
, virtual_row_conversions(std::move(virtual_row_conversions_))
{
}
@ -86,6 +87,7 @@ void VirtualRowTransform::work()
is_first = false;
Columns empty_columns;
const auto & header = getOutputs().front().getHeader();
empty_columns.reserve(header.columns());
for (size_t i = 0; i < header.columns(); ++i)
{
@ -94,7 +96,7 @@ void VirtualRowTransform::work()
}
current_chunk.setColumns(empty_columns, 0);
current_chunk.getChunkInfos().add(std::make_shared<MergeTreeReadInfo>(0, pk_block));
current_chunk.getChunkInfos().add(std::make_shared<MergeTreeReadInfo>(0, pk_block, virtual_row_conversions));
}
else
{

View File

@ -11,7 +11,7 @@ namespace DB
class VirtualRowTransform : public IProcessor
{
public:
explicit VirtualRowTransform(const Block & header_, const Block & pk_block_);
explicit VirtualRowTransform(const Block & header_, const Block & pk_block_, ExpressionActionsPtr virtual_row_conversions_);
String getName() const override { return "VirtualRowTransform"; }
@ -28,8 +28,8 @@ private:
bool can_generate = true;
bool is_first = true;
Block header;
Block pk_block;
ExpressionActionsPtr virtual_row_conversions;
};
}

View File

@ -249,7 +249,7 @@ InputOrderInfoPtr ReadInOrderOptimizer::getInputOrderImpl(
if (sort_description_for_merging.empty())
return {};
return std::make_shared<InputOrderInfo>(std::move(sort_description_for_merging), key_pos, read_direction, limit, false);
return std::make_shared<InputOrderInfo>(std::move(sort_description_for_merging), key_pos, read_direction, limit);
}
InputOrderInfoPtr ReadInOrderOptimizer::getInputOrder(

View File

@ -119,22 +119,13 @@ struct InputOrderInfo
const int direction;
const UInt64 limit;
/** For virtual row optimization only
* for example, when pk is (a,b), a = 1, order by b, virtual row should be
* disabled in the following case:
* 1st part (0, 100), (1, 2), (1, 3), (1, 4)
* 2nd part (0, 100), (1, 2), (1, 3), (1, 4).
*/
bool first_prefix_fixed;
InputOrderInfo(
const SortDescription & sort_description_for_merging_,
size_t used_prefix_of_sorting_key_size_,
int direction_, UInt64 limit_, bool first_prefix_fixed_)
int direction_, UInt64 limit_)
: sort_description_for_merging(sort_description_for_merging_)
, used_prefix_of_sorting_key_size(used_prefix_of_sorting_key_size_)
, direction(direction_), limit(limit_)
, first_prefix_fixed(first_prefix_fixed_)
{
}

View File

@ -1555,7 +1555,7 @@ bool ReadFromMerge::requestReadingInOrder(InputOrderInfoPtr order_info_)
auto request_read_in_order = [order_info_](ReadFromMergeTree & read_from_merge_tree)
{
return read_from_merge_tree.requestReadingInOrder(
order_info_->used_prefix_of_sorting_key_size, order_info_->direction, order_info_->limit);
order_info_->used_prefix_of_sorting_key_size, order_info_->direction, order_info_->limit, {});
};
bool ok = true;