Preserve order of virtual columns in the header and in data chunks when adding _part_offset together with other virtual columns

This commit is contained in:
Alexander Gololobov 2022-04-11 15:43:09 +02:00
parent f730ff9805
commit c484a1c269
4 changed files with 84 additions and 62 deletions

View File

@ -54,12 +54,20 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
{
header_without_virtual_columns = getPort().getHeader();
/// Reverse order is to minimize reallocations when removing columns from the block
for (auto it = virt_column_names.rbegin(); it != virt_column_names.rend(); ++it)
if (header_without_virtual_columns.has(*it))
header_without_virtual_columns.erase(*it);
if (std::find(virt_column_names.begin(), virt_column_names.end(), "_part_offset") != virt_column_names.end())
add_part_offset = true;
{
if (*it == "_part_offset")
{
non_const_virtual_column_names.emplace_back(*it);
}
else
{
/// Remove virtual columns that are going to be filled with const values
if (header_without_virtual_columns.has(*it))
header_without_virtual_columns.erase(*it);
}
}
if (prewhere_info)
{
@ -202,23 +210,23 @@ void MergeTreeBaseSelectProcessor::initializeRangeReaders(MergeTreeReadTask & cu
{
if (reader->getColumns().empty())
{
current_task.range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_actions.get(), true, add_part_offset);
current_task.range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_actions.get(), true, non_const_virtual_column_names);
}
else
{
MergeTreeRangeReader * pre_reader_ptr = nullptr;
if (pre_reader != nullptr)
{
current_task.pre_range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_actions.get(), false, add_part_offset);
current_task.pre_range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_actions.get(), false, non_const_virtual_column_names);
pre_reader_ptr = &current_task.pre_range_reader;
}
current_task.range_reader = MergeTreeRangeReader(reader.get(), pre_reader_ptr, nullptr, true, add_part_offset);
current_task.range_reader = MergeTreeRangeReader(reader.get(), pre_reader_ptr, nullptr, true, non_const_virtual_column_names);
}
}
else
{
current_task.range_reader = MergeTreeRangeReader(reader.get(), nullptr, nullptr, true, add_part_offset);
current_task.range_reader = MergeTreeRangeReader(reader.get(), nullptr, nullptr, true, non_const_virtual_column_names);
}
}
@ -311,12 +319,6 @@ Chunk MergeTreeBaseSelectProcessor::readFromPartImpl()
ordered_columns.emplace_back(std::move(read_result.columns[pos_in_sample_block]));
}
if (add_part_offset)
{
auto pos_in_sample_block = sample_block.getPositionByName("_part_offset");
ordered_columns.emplace_back(std::move(read_result.columns[pos_in_sample_block]));
}
return Chunk(std::move(ordered_columns), read_result.num_rows);
}
@ -350,7 +352,25 @@ namespace
};
}
static void injectVirtualColumnsImpl(
/// Adds virtual columns that are not const for all rows
static void injectNonConstVirtualColumns(
size_t rows,
VirtualColumnsInserter & inserter,
const Names & virtual_columns)
{
if (unlikely(rows))
throw Exception("Cannot insert non-constant virtual column to non-empty chunk.",
ErrorCodes::LOGICAL_ERROR);
for (const auto & virtual_column_name : virtual_columns)
{
if (virtual_column_name == "_part_offset")
inserter.insertUInt64Column(DataTypeUInt64().createColumn(), virtual_column_name);
}
}
/// Adds virtual columns that are const for the whole part
static void injectPartConstVirtualColumns(
size_t rows,
VirtualColumnsInserter & inserter,
MergeTreeReadTask * task,
@ -511,20 +531,11 @@ void MergeTreeBaseSelectProcessor::injectVirtualColumns(
Block & block, MergeTreeReadTask * task, const DataTypePtr & partition_value_type, const Names & virtual_columns)
{
VirtualColumnsInserterIntoBlock inserter{block};
injectVirtualColumnsImpl(block.rows(), inserter, task, partition_value_type, virtual_columns);
/// injectVirtualColumns is used to get header of a block, so we patch it here. But when generating data, it
/// is not a constant value, so we do not put it in injectVirtualColumnsImpl.
if (std::find(virtual_columns.begin(), virtual_columns.end(), "_part_offset") != virtual_columns.end())
{
ColumnPtr column;
if (block.rows())
column = DataTypeUInt64().createColumnConst(block.rows(), 0)->convertToFullColumnIfConst();
else
column = DataTypeUInt64().createColumn();
inserter.insertUInt64Column(column, "_part_offset");
}
/// First add non-const columns that are filled by the range reader and then const columns that we will fill ourselves.
/// Note that the order is important: virtual columns filled by the range reader must go first
injectNonConstVirtualColumns(block.rows(), inserter, virtual_columns);
injectPartConstVirtualColumns(block.rows(), inserter, task, partition_value_type, virtual_columns);
}
void MergeTreeBaseSelectProcessor::injectVirtualColumns(
@ -534,7 +545,8 @@ void MergeTreeBaseSelectProcessor::injectVirtualColumns(
auto columns = chunk.detachColumns();
VirtualColumnsInserterIntoColumns inserter{columns};
injectVirtualColumnsImpl(num_rows, inserter, task, partition_value_type, virtual_columns);
/// Only add const virtual columns because non-const ones have already been added
injectPartConstVirtualColumns(num_rows, inserter, task, partition_value_type, virtual_columns);
chunk.setColumns(columns, num_rows);
}

View File

@ -103,7 +103,8 @@ protected:
Names virt_column_names;
bool add_part_offset = false;
/// These columns will be filled by the merge tree range reader
Names non_const_virtual_column_names;
DataTypePtr partition_value_type;

View File

@ -538,14 +538,14 @@ MergeTreeRangeReader::MergeTreeRangeReader(
MergeTreeRangeReader * prev_reader_,
const PrewhereExprInfo * prewhere_info_,
bool last_reader_in_chain_,
bool add_part_offset_)
const Names & non_const_virtual_column_names_)
: merge_tree_reader(merge_tree_reader_)
, index_granularity(&(merge_tree_reader->data_part->index_granularity))
, prev_reader(prev_reader_)
, prewhere_info(prewhere_info_)
, last_reader_in_chain(last_reader_in_chain_)
, is_initialized(true)
, add_part_offset(add_part_offset_)
, non_const_virtual_column_names(non_const_virtual_column_names_)
{
if (prev_reader)
sample_block = prev_reader->getSampleBlock();
@ -553,10 +553,13 @@ MergeTreeRangeReader::MergeTreeRangeReader(
for (const auto & name_and_type : merge_tree_reader->getColumns())
sample_block.insert({name_and_type.type->createColumn(), name_and_type.type, name_and_type.name});
if(add_part_offset && !sample_block.has("_part_offset"))
for (const auto & column_name : non_const_virtual_column_names)
{
ColumnPtr column(ColumnUInt64::create());
sample_block.insert(ColumnWithTypeAndName(column, std::make_shared<DataTypeUInt64>(), "_part_offset"));
if (sample_block.has(column_name))
continue;
if (column_name == "_part_offset")
sample_block.insert(ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), column_name));
}
if (prewhere_info)
@ -748,11 +751,9 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
if (read_result.num_rows)
{
Columns physical_columns;
if (add_part_offset)
physical_columns.assign(read_result.columns.begin(), read_result.columns.begin() + read_result.columns.size() - 1);
else
physical_columns.assign(read_result.columns.begin(), read_result.columns.end());
/// Physical columns go first and then some virtual columns follow
const size_t physical_columns_count = read_result.columns.size() - non_const_virtual_column_names.size();
Columns physical_columns(read_result.columns.begin(), read_result.columns.begin() + physical_columns_count);
bool should_evaluate_missing_defaults;
merge_tree_reader->fillMissingColumns(physical_columns, should_evaluate_missing_defaults,
@ -794,12 +795,15 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::startReadingChain(size_t
size_t current_task_last_mark = getLastMark(ranges);
UInt64 leading_part_offset = 0;
UInt64 leading_last_part_offset = 0;
/// The stream could be unfinished by the previous read request because of max_rows limit.
/// In this case it will have some rows from the previously started range. We need to save their begin and
/// end offsets to properly fill _part_offset column.
UInt64 leading_begin_part_offset = 0;
UInt64 leading_end_part_offset = 0;
if (!stream.isFinished())
{
leading_part_offset = stream.currentPartOffset();
leading_last_part_offset = stream.lastPartOffset();
leading_begin_part_offset = stream.currentPartOffset();
leading_end_part_offset = stream.lastPartOffset();
}
/// Stream is lazy. result.num_added_rows is the number of rows added to block which is not equal to
@ -838,13 +842,16 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::startReadingChain(size_t
/// Last granule may be incomplete.
result.adjustLastGranule();
if(add_part_offset)
patchPartOffsetColumn(result, leading_part_offset, leading_last_part_offset);
for (const auto & column_name : non_const_virtual_column_names)
{
if (column_name == "_part_offset")
fillPartOffsetColumn(result, leading_begin_part_offset, leading_end_part_offset);
}
return result;
}
void MergeTreeRangeReader::patchPartOffsetColumn(ReadResult & result, UInt64 leading_part_offset, UInt64 leading_last_part_offset)
void MergeTreeRangeReader::fillPartOffsetColumn(ReadResult & result, UInt64 leading_begin_part_offset, UInt64 leading_end_part_offset)
{
size_t num_rows = result.numReadRows();
@ -854,18 +861,15 @@ void MergeTreeRangeReader::patchPartOffsetColumn(ReadResult & result, UInt64 lea
UInt64 * pos = vec.data();
UInt64 * end = &vec[num_rows];
if(leading_last_part_offset)
{
while (pos < end && leading_part_offset < leading_last_part_offset)
*pos++ = leading_part_offset++;
}
while (pos < end && leading_begin_part_offset < leading_end_part_offset)
*pos++ = leading_begin_part_offset++;
const auto start_ranges = result.startedRanges();
for (size_t i = 0; i < start_ranges.size(); ++i)
for (const auto & start_range : start_ranges)
{
UInt64 start_part_offset = index_granularity->getMarkStartingRow(start_ranges[i].range.begin);
UInt64 end_part_offset = index_granularity->getMarkStartingRow(start_ranges[i].range.end);
UInt64 start_part_offset = index_granularity->getMarkStartingRow(start_range.range.begin);
UInt64 end_part_offset = index_granularity->getMarkStartingRow(start_range.range.end);
while (pos < end && start_part_offset < end_part_offset)
*pos++ = start_part_offset++;
@ -990,7 +994,7 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
const auto & header = merge_tree_reader->getColumns();
size_t num_columns = header.size();
if (result.columns.size() != (add_part_offset ? num_columns + 1 : num_columns))
if (result.columns.size() != (num_columns + non_const_virtual_column_names.size()))
throw Exception("Invalid number of columns passed to MergeTreeRangeReader. "
"Expected " + toString(num_columns) + ", "
"got " + toString(result.columns.size()), ErrorCodes::LOGICAL_ERROR);
@ -1016,9 +1020,13 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
for (auto name_and_type = header.begin(); pos < num_columns; ++pos, ++name_and_type)
block.insert({result.columns[pos], name_and_type->type, name_and_type->name});
if (add_part_offset)
for (const auto & column_name : non_const_virtual_column_names)
{
block.insert({result.columns[pos], std::make_shared<DataTypeUInt64>(), "_part_offset"});
if (column_name == "_part_offset")
block.insert({result.columns[pos], std::make_shared<DataTypeUInt64>(), column_name});
else
throw Exception("Unexpected non-const virtual column: " + column_name, ErrorCodes::LOGICAL_ERROR);
++pos;
}
if (prewhere_info->alias_actions)

View File

@ -45,7 +45,7 @@ public:
MergeTreeRangeReader * prev_reader_,
const PrewhereExprInfo * prewhere_info_,
bool last_reader_in_chain_,
bool add_part_offset);
const Names & non_const_virtual_column_names);
MergeTreeRangeReader() = default;
@ -59,6 +59,7 @@ public:
bool isCurrentRangeFinished() const;
bool isInitialized() const { return is_initialized; }
/// Accumulates sequential read() requests to perform a large read instead of multiple small reads
class DelayedStream
{
public:
@ -239,7 +240,7 @@ private:
ReadResult startReadingChain(size_t max_rows, MarkRanges & ranges);
Columns continueReadingChain(ReadResult & result, size_t & num_rows);
void executePrewhereActionsAndFilterColumns(ReadResult & result);
void patchPartOffsetColumn(ReadResult & result, UInt64 leading_part_offset, UInt64 leading_rows);
void fillPartOffsetColumn(ReadResult & result, UInt64 leading_begin_part_offset, UInt64 leading_end_part_offset);
IMergeTreeReader * merge_tree_reader = nullptr;
const MergeTreeIndexGranularity * index_granularity = nullptr;
@ -252,7 +253,7 @@ private:
bool last_reader_in_chain = false;
bool is_initialized = false;
bool add_part_offset = false;
Names non_const_virtual_column_names;
};
}