mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
add simple virtual row
This commit is contained in:
parent
becbef9e48
commit
72ebd39572
@ -597,7 +597,7 @@ Pipe ReadFromMergeTree::readInOrder(
|
||||
|
||||
processor->addPartLevelToChunk(isQueryWithFinal());
|
||||
|
||||
processor->addVirtualRowToChunk(need_virtual_row);
|
||||
processor->addVirtualRowToChunk(need_virtual_row, part_with_ranges.data_part->getIndex());
|
||||
|
||||
auto source = std::make_shared<MergeTreeSource>(std::move(processor));
|
||||
if (set_rows_approx)
|
||||
|
@ -946,7 +946,7 @@ String addDummyColumnWithRowCount(Block & block, size_t num_rows)
|
||||
}
|
||||
|
||||
|
||||
MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, MarkRanges & ranges, bool add_virtual_row)
|
||||
MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, MarkRanges & ranges)
|
||||
{
|
||||
if (max_rows == 0)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected at least 1 row to read, got 0.");
|
||||
@ -961,7 +961,7 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
|
||||
|
||||
if (prev_reader)
|
||||
{
|
||||
read_result = prev_reader->read(max_rows, ranges, add_virtual_row);
|
||||
read_result = prev_reader->read(max_rows, ranges);
|
||||
|
||||
size_t num_read_rows;
|
||||
Columns columns = continueReadingChain(read_result, num_read_rows);
|
||||
@ -1026,15 +1026,8 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
|
||||
}
|
||||
else
|
||||
{
|
||||
// if (add_virtual_row)
|
||||
// {
|
||||
// generate the virtual row
|
||||
// }
|
||||
// else
|
||||
// {
|
||||
read_result = startReadingChain(max_rows, ranges);
|
||||
read_result.num_rows = read_result.numReadRows();
|
||||
// }
|
||||
|
||||
LOG_TEST(log, "First reader returned: {}, requested columns: {}",
|
||||
read_result.dumpInfo(), dumpNames(merge_tree_reader->getColumns()));
|
||||
@ -1069,11 +1062,7 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
|
||||
read_result.addNumBytesRead(total_bytes);
|
||||
}
|
||||
|
||||
/// If add_virtual_row is enabled, don't turn on prewhere so that virtual row can always pass through.
|
||||
// if (!add_virtual_row)
|
||||
// {
|
||||
executePrewhereActionsAndFilterColumns(read_result);
|
||||
// }
|
||||
executePrewhereActionsAndFilterColumns(read_result);
|
||||
|
||||
read_result.checkInternalConsistency();
|
||||
|
||||
|
@ -300,7 +300,7 @@ public:
|
||||
LoggerPtr log;
|
||||
};
|
||||
|
||||
ReadResult read(size_t max_rows, MarkRanges & ranges, bool add_virtual_row);
|
||||
ReadResult read(size_t max_rows, MarkRanges & ranges);
|
||||
|
||||
const Block & getSampleBlock() const { return result_sample_block; }
|
||||
|
||||
|
@ -158,7 +158,7 @@ MergeTreeReadTask::BlockAndProgress MergeTreeReadTask::read(const BlockSizeParam
|
||||
UInt64 recommended_rows = estimateNumRows(params);
|
||||
UInt64 rows_to_read = std::max(static_cast<UInt64>(1), std::min(params.max_block_size_rows, recommended_rows));
|
||||
|
||||
auto read_result = range_readers.main.read(rows_to_read, mark_ranges, add_virtual_row);
|
||||
auto read_result = range_readers.main.read(rows_to_read, mark_ranges);
|
||||
|
||||
if (add_virtual_row)
|
||||
{
|
||||
|
@ -117,7 +117,6 @@ public:
|
||||
size_t row_count = 0;
|
||||
size_t num_read_rows = 0;
|
||||
size_t num_read_bytes = 0;
|
||||
bool is_virtual_row = false;
|
||||
};
|
||||
|
||||
MergeTreeReadTask(
|
||||
@ -141,8 +140,6 @@ public:
|
||||
static Readers createReaders(const MergeTreeReadTaskInfoPtr & read_info, const Extras & extras, const MarkRanges & ranges);
|
||||
static RangeReaders createRangeReaders(const Readers & readers, const PrewhereExprInfo & prewhere_actions);
|
||||
|
||||
void addVirtualRow() { add_virtual_row = true; }
|
||||
|
||||
private:
|
||||
UInt64 estimateNumRows(const BlockSizeParams & params) const;
|
||||
|
||||
|
@ -133,38 +133,64 @@ ChunkAndProgress MergeTreeSelectProcessor::read()
|
||||
|
||||
if (!task->getMainRangeReader().isInitialized())
|
||||
initializeRangeReaders();
|
||||
|
||||
add_virtual_row = false;
|
||||
if (add_virtual_row)
|
||||
{
|
||||
/// Turn on virtual row just once.
|
||||
task->addVirtualRow();
|
||||
add_virtual_row = false;
|
||||
}
|
||||
|
||||
auto res = algorithm->readFromTask(*task, block_size_params);
|
||||
const auto & primary_key = storage_snapshot->metadata->primary_key;
|
||||
|
||||
MergeTreeReadTask::BlockAndProgress res;
|
||||
res.row_count = 1;
|
||||
|
||||
if (res.row_count)
|
||||
{
|
||||
/// Reorder the columns according to result_header
|
||||
Columns ordered_columns;
|
||||
ordered_columns.reserve(result_header.columns());
|
||||
for (size_t i = 0; i < result_header.columns(); ++i)
|
||||
{
|
||||
auto name = result_header.getByPosition(i).name;
|
||||
ordered_columns.push_back(res.block.getByName(name).column);
|
||||
// TODO: composite pk???
|
||||
const ColumnWithTypeAndName & type_and_name = result_header.getByPosition(i);
|
||||
if (type_and_name.name == primary_key.column_names[0] && type_and_name.type == primary_key.data_types[0])
|
||||
ordered_columns.push_back(index[0]->cloneResized(1)); // TODO: use the first range pk whose range might contain results
|
||||
else
|
||||
ordered_columns.push_back(type_and_name.type->createColumn()->cloneResized(1));
|
||||
}
|
||||
|
||||
return ChunkAndProgress{
|
||||
.chunk = Chunk(ordered_columns, res.row_count,
|
||||
add_part_level || res.is_virtual_row ? std::make_shared<MergeTreeReadInfo>(
|
||||
(add_part_level ? task->getInfo().data_part->info.level : 0), res.is_virtual_row) : nullptr),
|
||||
.chunk = Chunk(ordered_columns, res.row_count, std::make_shared<MergeTreeReadInfo>(
|
||||
(add_part_level ? task->getInfo().data_part->info.level : 0), true)),
|
||||
.num_read_rows = res.num_read_rows,
|
||||
.num_read_bytes = res.num_read_bytes,
|
||||
.is_finished = false};
|
||||
}
|
||||
else
|
||||
{
|
||||
return {Chunk(), res.num_read_rows, res.num_read_bytes, false};
|
||||
auto res = algorithm->readFromTask(*task, block_size_params);
|
||||
|
||||
if (res.row_count)
|
||||
{
|
||||
/// Reorder the columns according to result_header
|
||||
Columns ordered_columns;
|
||||
ordered_columns.reserve(result_header.columns());
|
||||
for (size_t i = 0; i < result_header.columns(); ++i)
|
||||
{
|
||||
auto name = result_header.getByPosition(i).name;
|
||||
ordered_columns.push_back(res.block.getByName(name).column);
|
||||
}
|
||||
|
||||
return ChunkAndProgress{
|
||||
.chunk = Chunk(ordered_columns, res.row_count,
|
||||
add_part_level ? std::make_shared<MergeTreeReadInfo>(
|
||||
(add_part_level ? task->getInfo().data_part->info.level : 0), false) : nullptr),
|
||||
.num_read_rows = res.num_read_rows,
|
||||
.num_read_bytes = res.num_read_bytes,
|
||||
.is_finished = false};
|
||||
}
|
||||
else
|
||||
{
|
||||
return {Chunk(), res.num_read_rows, res.num_read_bytes, false};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -65,7 +65,11 @@ public:
|
||||
|
||||
void addPartLevelToChunk(bool add_part_level_) { add_part_level = add_part_level_; }
|
||||
|
||||
void addVirtualRowToChunk(bool add_virtual_row_) { add_virtual_row = add_virtual_row_; }
|
||||
void addVirtualRowToChunk(bool add_virtual_row_, const Columns& index_)
|
||||
{
|
||||
add_virtual_row = add_virtual_row_;
|
||||
index = index_;
|
||||
}
|
||||
|
||||
private:
|
||||
/// This struct allow to return block with no columns but with non-zero number of rows similar to Chunk
|
||||
@ -105,6 +109,8 @@ private:
|
||||
/// Virtual row is useful for read-in-order optimization when multiple parts exist.
|
||||
bool add_virtual_row = false;
|
||||
|
||||
Columns index;
|
||||
|
||||
LoggerPtr log = getLogger("MergeTreeSelectProcessor");
|
||||
std::atomic<bool> is_cancelled{false};
|
||||
};
|
||||
|
Loading…
Reference in New Issue
Block a user