sketch of read-in-order optimization

This commit is contained in:
jsc0218 2024-04-01 02:08:35 +00:00
parent 03f8334ea1
commit becbef9e48
14 changed files with 106 additions and 39 deletions

View File

@ -1,5 +1,5 @@
#include <Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.h>
#include <Processors/Merges/Algorithms/MergeTreePartLevelInfo.h>
#include <Processors/Merges/Algorithms/MergeTreeReadInfo.h>
namespace DB

View File

@ -1,5 +1,5 @@
#include <Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.h>
#include <Processors/Merges/Algorithms/MergeTreePartLevelInfo.h>
#include <Processors/Merges/Algorithms/MergeTreeReadInfo.h>
namespace DB
{

View File

@ -1,25 +0,0 @@
#pragma once
#include <Processors/Chunk.h>
namespace DB
{
/// To carry part level if chunk is produced by a merge tree source
class MergeTreePartLevelInfo : public ChunkInfo
{
public:
MergeTreePartLevelInfo() = delete;
explicit MergeTreePartLevelInfo(ssize_t part_level) : origin_merge_tree_part_level(part_level) { }
size_t origin_merge_tree_part_level = 0;
};
inline size_t getPartLevelFromChunk(const Chunk & chunk)
{
const auto & info = chunk.getChunkInfo();
if (const auto * part_level_info = typeid_cast<const MergeTreePartLevelInfo *>(info.get()))
return part_level_info->origin_merge_tree_part_level;
return 0;
}
}

View File

@ -0,0 +1,35 @@
#pragma once
#include <Processors/Chunk.h>
namespace DB
{
/// To carry part level and virtual row if chunk is produced by a merge tree source
class MergeTreeReadInfo : public ChunkInfo
{
public:
MergeTreeReadInfo() = delete;
explicit MergeTreeReadInfo(size_t part_level, bool virtual_row_) :
origin_merge_tree_part_level(part_level), virtual_row(virtual_row_) { }
size_t origin_merge_tree_part_level = 0;
bool virtual_row = false;
};
inline size_t getPartLevelFromChunk(const Chunk & chunk)
{
const auto & info = chunk.getChunkInfo();
if (const auto * read_info = typeid_cast<const MergeTreeReadInfo *>(info.get()))
return read_info->origin_merge_tree_part_level;
return 0;
}
inline bool getVirtualRowFromChunk(const Chunk & chunk)
{
const auto & info = chunk.getChunkInfo();
if (const auto * read_info = typeid_cast<const MergeTreeReadInfo *>(info.get()))
return read_info->virtual_row;
return 0;
}
}

View File

@ -1,3 +1,4 @@
#include <Processors/Merges/Algorithms/MergeTreeReadInfo.h>
#include <Processors/Merges/Algorithms/MergingSortedAlgorithm.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <IO/WriteBuffer.h>
@ -239,6 +240,15 @@ IMergingAlgorithm::Status MergingSortedAlgorithm::mergeBatchImpl(TSortingQueue &
auto [current_ptr, initial_batch_size] = queue.current();
auto current = *current_ptr;
if (getVirtualRowFromChunk(current_inputs[current.impl->order].chunk))
{
/// If virtual row is detected, there should be only one row as a single chunk,
/// and always skip this chunk to pull the next one.
assert(initial_batch_size == 1);
queue.removeTop();
return Status(current.impl->order);
}
bool batch_skip_last_row = false;
if (current.impl->isLast(initial_batch_size) && current_inputs[current.impl->order].skip_last_row)
{

View File

@ -501,7 +501,8 @@ Pipe ReadFromMergeTree::readInOrder(
Names required_columns,
PoolSettings pool_settings,
ReadType read_type,
UInt64 limit)
UInt64 limit,
bool need_virtual_row)
{
/// For reading in order it makes sense to read only
/// one range per task to reduce number of read rows.
@ -596,6 +597,8 @@ Pipe ReadFromMergeTree::readInOrder(
processor->addPartLevelToChunk(isQueryWithFinal());
processor->addVirtualRowToChunk(need_virtual_row);
auto source = std::make_shared<MergeTreeSource>(std::move(processor));
if (set_rows_approx)
source->addTotalRowsApprox(total_rows);
@ -1028,7 +1031,12 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
}
for (auto && item : splitted_parts_and_ranges)
pipes.emplace_back(readInOrder(std::move(item), column_names, pool_settings, read_type, input_order_info->limit));
{
/// need_virtual_row = true means a MergingSortedTransform should occur.
/// If so, adding a virtual row might speedup in the case of multiple parts.
bool need_virtual_row = (need_preliminary_merge || output_each_partition_through_separate_port) && item.size() > 1;
pipes.emplace_back(readInOrder(std::move(item), column_names, pool_settings, read_type, input_order_info->limit, need_virtual_row));
}
}
Block pipe_header;

View File

@ -251,7 +251,7 @@ private:
Pipe read(RangesInDataParts parts_with_range, Names required_columns, ReadType read_type, size_t max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache);
Pipe readFromPool(RangesInDataParts parts_with_range, Names required_columns, PoolSettings pool_settings);
Pipe readFromPoolParallelReplicas(RangesInDataParts parts_with_range, Names required_columns, PoolSettings pool_settings);
Pipe readInOrder(RangesInDataParts parts_with_ranges, Names required_columns, PoolSettings pool_settings, ReadType read_type, UInt64 limit);
Pipe readInOrder(RangesInDataParts parts_with_ranges, Names required_columns, PoolSettings pool_settings, ReadType read_type, UInt64 limit, bool need_virtual_row = false);
Pipe spreadMarkRanges(RangesInDataParts && parts_with_ranges, size_t num_streams, AnalysisResult & result, ActionsDAGPtr & result_projection);

View File

@ -946,7 +946,7 @@ String addDummyColumnWithRowCount(Block & block, size_t num_rows)
}
MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, MarkRanges & ranges)
MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, MarkRanges & ranges, bool add_virtual_row)
{
if (max_rows == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected at least 1 row to read, got 0.");
@ -961,7 +961,7 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
if (prev_reader)
{
read_result = prev_reader->read(max_rows, ranges);
read_result = prev_reader->read(max_rows, ranges, add_virtual_row);
size_t num_read_rows;
Columns columns = continueReadingChain(read_result, num_read_rows);
@ -1026,8 +1026,15 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
}
else
{
// if (add_virtual_row)
// {
// generate the virtual row
// }
// else
// {
read_result = startReadingChain(max_rows, ranges);
read_result.num_rows = read_result.numReadRows();
// }
LOG_TEST(log, "First reader returned: {}, requested columns: {}",
read_result.dumpInfo(), dumpNames(merge_tree_reader->getColumns()));
@ -1062,7 +1069,11 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
read_result.addNumBytesRead(total_bytes);
}
executePrewhereActionsAndFilterColumns(read_result);
/// If add_virtual_row is enabled, don't turn on prewhere so that virtual row can always pass through.
// if (!add_virtual_row)
// {
executePrewhereActionsAndFilterColumns(read_result);
// }
read_result.checkInternalConsistency();

View File

@ -300,7 +300,7 @@ public:
LoggerPtr log;
};
ReadResult read(size_t max_rows, MarkRanges & ranges);
ReadResult read(size_t max_rows, MarkRanges & ranges, bool add_virtual_row);
const Block & getSampleBlock() const { return result_sample_block; }

View File

@ -158,7 +158,13 @@ MergeTreeReadTask::BlockAndProgress MergeTreeReadTask::read(const BlockSizeParam
UInt64 recommended_rows = estimateNumRows(params);
UInt64 rows_to_read = std::max(static_cast<UInt64>(1), std::min(params.max_block_size_rows, recommended_rows));
auto read_result = range_readers.main.read(rows_to_read, mark_ranges);
auto read_result = range_readers.main.read(rows_to_read, mark_ranges, add_virtual_row);
if (add_virtual_row)
{
/// Now we have the virtual row, which is at most once for each part.
add_virtual_row = false;
}
/// All rows were filtered. Repeat.
if (read_result.num_rows == 0)

View File

@ -117,6 +117,7 @@ public:
size_t row_count = 0;
size_t num_read_rows = 0;
size_t num_read_bytes = 0;
bool is_virtual_row = false;
};
MergeTreeReadTask(
@ -140,6 +141,8 @@ public:
static Readers createReaders(const MergeTreeReadTaskInfoPtr & read_info, const Extras & extras, const MarkRanges & ranges);
static RangeReaders createRangeReaders(const Readers & readers, const PrewhereExprInfo & prewhere_actions);
void addVirtualRow() { add_virtual_row = true; }
private:
UInt64 estimateNumRows(const BlockSizeParams & params) const;
@ -158,6 +161,9 @@ private:
/// Used to satistfy preferred_block_size_bytes limitation
MergeTreeBlockSizePredictorPtr size_predictor;
/// If true, add once, and then set false.
bool add_virtual_row = false;
};
using MergeTreeReadTaskPtr = std::unique_ptr<MergeTreeReadTask>;

View File

@ -6,7 +6,7 @@
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <Common/logger_useful.h>
#include <Common/typeid_cast.h>
#include <Processors/Merges/Algorithms/MergeTreePartLevelInfo.h>
#include <Processors/Merges/Algorithms/MergeTreeReadInfo.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeArray.h>
#include <Processors/Chunk.h>
@ -134,6 +134,13 @@ ChunkAndProgress MergeTreeSelectProcessor::read()
if (!task->getMainRangeReader().isInitialized())
initializeRangeReaders();
if (add_virtual_row)
{
/// Turn on virtual row just once.
task->addVirtualRow();
add_virtual_row = false;
}
auto res = algorithm->readFromTask(*task, block_size_params);
if (res.row_count)
@ -148,7 +155,9 @@ ChunkAndProgress MergeTreeSelectProcessor::read()
}
return ChunkAndProgress{
.chunk = Chunk(ordered_columns, res.row_count, add_part_level ? std::make_shared<MergeTreePartLevelInfo>(task->getInfo().data_part->info.level) : nullptr),
.chunk = Chunk(ordered_columns, res.row_count,
add_part_level || res.is_virtual_row ? std::make_shared<MergeTreeReadInfo>(
(add_part_level ? task->getInfo().data_part->info.level : 0), res.is_virtual_row) : nullptr),
.num_read_rows = res.num_read_rows,
.num_read_bytes = res.num_read_bytes,
.is_finished = false};

View File

@ -65,6 +65,8 @@ public:
void addPartLevelToChunk(bool add_part_level_) { add_part_level = add_part_level_; }
void addVirtualRowToChunk(bool add_virtual_row_) { add_virtual_row = add_virtual_row_; }
private:
/// This struct allow to return block with no columns but with non-zero number of rows similar to Chunk
struct BlockAndProgress
@ -99,6 +101,10 @@ private:
/// Should we add part level to produced chunk. Part level is useful for next steps if query has FINAL
bool add_part_level = false;
/// Should we add a virtual row as the single first chunk.
/// Virtual row is useful for read-in-order optimization when multiple parts exist.
bool add_virtual_row = false;
LoggerPtr log = getLogger("MergeTreeSelectProcessor");
std::atomic<bool> is_cancelled{false};
};

View File

@ -13,7 +13,7 @@
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Common/logger_useful.h>
#include <Processors/Merges/Algorithms/MergeTreePartLevelInfo.h>
#include <Processors/Merges/Algorithms/MergeTreeReadInfo.h>
namespace DB
{
@ -262,7 +262,8 @@ try
++it;
}
return Chunk(std::move(res_columns), rows_read, add_part_level ? std::make_shared<MergeTreePartLevelInfo>(data_part->info.level) : nullptr);
return Chunk(std::move(res_columns), rows_read,
add_part_level ? std::make_shared<MergeTreeReadInfo>(data_part->info.level, false) : nullptr);
}
}
else