This commit is contained in:
jsc0218 2024-09-19 11:02:21 -04:00 committed by GitHub
commit e31435cc01
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 752 additions and 87 deletions

View File

@ -1,5 +1,5 @@
#include <Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.h>
#include <Processors/Merges/Algorithms/MergeTreePartLevelInfo.h>
#include <Processors/Merges/Algorithms/MergeTreeReadInfo.h>
namespace DB

View File

@ -1,5 +1,5 @@
#include <Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.h>
#include <Processors/Merges/Algorithms/MergeTreePartLevelInfo.h>
#include <Processors/Merges/Algorithms/MergeTreeReadInfo.h>
namespace DB
{

View File

@ -1,29 +0,0 @@
#pragma once
#include <Processors/Chunk.h>
namespace DB
{
/// To carry part level if chunk is produced by a merge tree source
class MergeTreePartLevelInfo : public ChunkInfoCloneable<MergeTreePartLevelInfo>
{
public:
MergeTreePartLevelInfo() = delete;
explicit MergeTreePartLevelInfo(ssize_t part_level)
: origin_merge_tree_part_level(part_level)
{ }
MergeTreePartLevelInfo(const MergeTreePartLevelInfo & other) = default;
size_t origin_merge_tree_part_level = 0;
};
inline size_t getPartLevelFromChunk(const Chunk & chunk)
{
const auto part_level_info = chunk.getChunkInfos().get<MergeTreePartLevelInfo>();
if (part_level_info)
return part_level_info->origin_merge_tree_part_level;
return 0;
}
}

View File

@ -0,0 +1,38 @@
#pragma once
#include <Processors/Chunk.h>
namespace DB
{
/// To carry part level and virtual row if chunk is produced by a merge tree source
class MergeTreeReadInfo : public ChunkInfoCloneable<MergeTreeReadInfo>
{
public:
MergeTreeReadInfo() = delete;
explicit MergeTreeReadInfo(size_t part_level, bool virtual_row_) :
origin_merge_tree_part_level(part_level), virtual_row(virtual_row_) { }
MergeTreeReadInfo(const MergeTreeReadInfo & other) = default;
size_t origin_merge_tree_part_level = 0;
/// If virtual_row is true, the chunk must contain the virtual row only.
bool virtual_row = false;
};
inline size_t getPartLevelFromChunk(const Chunk & chunk)
{
const auto read_info = chunk.getChunkInfos().get<MergeTreeReadInfo>();
if (read_info)
return read_info->origin_merge_tree_part_level;
return 0;
}
inline bool getVirtualRowFromChunk(const Chunk & chunk)
{
const auto read_info = chunk.getChunkInfos().get<MergeTreeReadInfo>();
if (read_info)
return read_info->virtual_row;
return false;
}
}

View File

@ -1,3 +1,4 @@
#include <Processors/Merges/Algorithms/MergeTreeReadInfo.h>
#include <Processors/Merges/Algorithms/MergingSortedAlgorithm.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <IO/WriteBuffer.h>
@ -7,6 +8,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
MergingSortedAlgorithm::MergingSortedAlgorithm(
Block header_,
size_t num_inputs,
@ -133,6 +139,9 @@ IMergingAlgorithm::Status MergingSortedAlgorithm::mergeImpl(TSortingHeap & queue
auto current = queue.current();
if (getVirtualRowFromChunk(current_inputs[current.impl->order].chunk))
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Virtual row is not implemented for Non-batch mode.");
if (current.impl->isLast() && current_inputs[current.impl->order].skip_last_row)
{
/// Get the next block from the corresponding source, if there is one.
@ -229,6 +238,15 @@ IMergingAlgorithm::Status MergingSortedAlgorithm::mergeBatchImpl(TSortingQueue &
auto [current_ptr, initial_batch_size] = queue.current();
auto current = *current_ptr;
if (getVirtualRowFromChunk(current_inputs[current.impl->order].chunk))
{
/// If virtual row is detected, there should be only one row as a single chunk,
/// and always skip this chunk to pull the next one.
chassert(initial_batch_size == 1);
queue.removeTop();
return Status(current.impl->order);
}
bool batch_skip_last_row = false;
if (current.impl->isLast(initial_batch_size) && current_inputs[current.impl->order].skip_last_row)
{

View File

@ -1,3 +1,4 @@
#include <Processors/Merges/Algorithms/MergeTreeReadInfo.h>
#include <Processors/Merges/IMergingTransform.h>
namespace DB
@ -101,8 +102,13 @@ IProcessor::Status IMergingTransformBase::prepareInitializeInputs()
/// setNotNeeded after reading first chunk, because in optimismtic case
/// (e.g. with optimized 'ORDER BY primary_key LIMIT n' and small 'n')
/// we won't have to read any chunks anymore;
auto chunk = input.pull(limit_hint != 0);
if ((limit_hint && chunk.getNumRows() < limit_hint) || always_read_till_end)
/// If virtual row exists, let it pass through, so don't read more chunks.
auto chunk = input.pull(true);
bool virtual_row = getVirtualRowFromChunk(chunk);
if (limit_hint == 0 && !virtual_row)
input.setNeeded();
if (!virtual_row && ((limit_hint && chunk.getNumRows() < limit_hint) || always_read_till_end))
input.setNeeded();
if (!chunk.hasRows())

View File

@ -1,4 +1,5 @@
#include <Processors/QueryPlan/BufferChunksTransform.h>
#include <Processors/Merges/Algorithms/MergeTreeReadInfo.h>
namespace DB
{
@ -48,14 +49,27 @@ IProcessor::Status BufferChunksTransform::prepare()
}
else if (input.hasData())
{
auto chunk = pullChunk();
bool virtual_row;
auto chunk = pullChunk(virtual_row);
output.push(std::move(chunk));
if (virtual_row)
{
input.setNotNeeded();
return Status::PortFull;
}
}
}
if (input.hasData() && (num_buffered_rows < max_rows_to_buffer || num_buffered_bytes < max_bytes_to_buffer))
{
auto chunk = pullChunk();
bool virtual_row;
auto chunk = pullChunk(virtual_row);
if (virtual_row)
{
output.push(std::move(chunk));
input.setNotNeeded();
return Status::PortFull;
}
num_buffered_rows += chunk.getNumRows();
num_buffered_bytes += chunk.bytes();
chunks.push(std::move(chunk));
@ -71,10 +85,12 @@ IProcessor::Status BufferChunksTransform::prepare()
return Status::NeedData;
}
Chunk BufferChunksTransform::pullChunk()
Chunk BufferChunksTransform::pullChunk(bool & virtual_row)
{
auto chunk = input.pull();
num_processed_rows += chunk.getNumRows();
virtual_row = getVirtualRowFromChunk(chunk);
if (!virtual_row)
num_processed_rows += chunk.getNumRows();
if (limit && num_processed_rows >= limit)
input.close();

View File

@ -24,7 +24,7 @@ public:
String getName() const override { return "BufferChunks"; }
private:
Chunk pullChunk();
Chunk pullChunk(bool & virtual_row);
InputPort & input;
OutputPort & output;

View File

@ -104,6 +104,17 @@ static QueryPlan::Node * findReadingStep(QueryPlan::Node & node, StepStack & bac
return nullptr;
}
static bool checkVirtualRowSupport(const StepStack & backward_path)
{
for (size_t i = 0; i < backward_path.size() - 1; i++)
{
IQueryPlanStep * step = backward_path[i];
if (!typeid_cast<ExpressionStep *>(step) && !typeid_cast<FilterStep *>(step))
return false;
}
return true;
}
void updateStepsDataStreams(StepStack & steps_to_update)
{
/// update data stream's sorting properties for found transforms
@ -382,6 +393,7 @@ InputOrderInfoPtr buildInputOrderInfo(
int read_direction = 0;
size_t next_description_column = 0;
size_t next_sort_key = 0;
bool first_prefix_fixed = false;
while (next_description_column < description.size() && next_sort_key < sorting_key.column_names.size())
{
@ -459,6 +471,9 @@ InputOrderInfoPtr buildInputOrderInfo(
}
else if (fixed_key_columns.contains(sort_column_node))
{
if (next_sort_key == 0)
first_prefix_fixed = true;
//std::cerr << "+++++++++ Found fixed key by match" << std::endl;
++next_sort_key;
}
@ -493,7 +508,7 @@ InputOrderInfoPtr buildInputOrderInfo(
if (read_direction == 0 || order_key_prefix_descr.empty())
return nullptr;
return std::make_shared<InputOrderInfo>(order_key_prefix_descr, next_sort_key, read_direction, limit);
return std::make_shared<InputOrderInfo>(order_key_prefix_descr, next_sort_key, read_direction, limit, first_prefix_fixed);
}
/// We really need three different sort descriptions here.
@ -697,7 +712,7 @@ AggregationInputOrder buildInputOrderInfo(
for (const auto & key : not_matched_group_by_keys)
group_by_sort_description.emplace_back(SortColumnDescription(std::string(key)));
auto input_order = std::make_shared<InputOrderInfo>(order_key_prefix_descr, next_sort_key, /*read_direction*/ 1, /* limit */ 0);
auto input_order = std::make_shared<InputOrderInfo>(order_key_prefix_descr, next_sort_key, /*read_direction*/ 1, /* limit */ 0, false);
return { std::move(input_order), std::move(sort_description_for_merging), std::move(group_by_sort_description) };
}
@ -832,6 +847,11 @@ InputOrderInfoPtr buildInputOrderInfo(SortingStep & sorting, QueryPlan::Node & n
bool can_read = reading->requestReadingInOrder(order_info->used_prefix_of_sorting_key_size, order_info->direction, order_info->limit);
if (!can_read)
return nullptr;
if (!checkVirtualRowSupport(backward_path))
reading->setVirtualRowStatus(ReadFromMergeTree::VirtualRowStatus::No);
else if (!order_info->first_prefix_fixed)
reading->setVirtualRowStatus(ReadFromMergeTree::VirtualRowStatus::Possible);
}
return order_info;

View File

@ -27,6 +27,7 @@
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/ReverseTransform.h>
#include <Processors/Transforms/SelectByIndicesTransform.h>
#include <Processors/Transforms/VirtualRowTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/MergeTreeIndexLegacyVectorSimilarity.h>
@ -589,7 +590,8 @@ Pipe ReadFromMergeTree::readInOrder(
Names required_columns,
PoolSettings pool_settings,
ReadType read_type,
UInt64 read_limit)
UInt64 read_limit,
bool enable_current_virtual_row)
{
/// For reading in order it makes sense to read only
/// one range per task to reduce number of read rows.
@ -699,7 +701,20 @@ Pipe ReadFromMergeTree::readInOrder(
if (set_total_rows_approx)
source->addTotalRowsApprox(total_rows);
pipes.emplace_back(std::move(source));
Pipe pipe(source);
if (enable_current_virtual_row && (read_type == ReadType::InOrder))
{
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<VirtualRowTransform>(header,
storage_snapshot->metadata->primary_key,
part_with_ranges.data_part->getIndex(),
part_with_ranges.ranges.front().begin);
});
}
pipes.emplace_back(std::move(pipe));
}
auto pipe = Pipe::unitePipes(std::move(pipes));
@ -742,7 +757,7 @@ Pipe ReadFromMergeTree::read(
if (read_type == ReadType::Default && (max_streams > 1 || checkAllPartsOnRemoteFS(parts_with_range)))
return readFromPool(std::move(parts_with_range), std::move(required_columns), std::move(pool_settings));
auto pipe = readInOrder(parts_with_range, required_columns, pool_settings, read_type, /*limit=*/ 0);
auto pipe = readInOrder(parts_with_range, required_columns, pool_settings, read_type, /*limit=*/ 0, false);
/// Use ConcatProcessor to concat sources together.
/// It is needed to read in parts order (and so in PK order) if single thread is used.
@ -1051,7 +1066,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
/// For parallel replicas the split will be performed on the initiator side.
if (is_parallel_reading_from_replicas)
{
pipes.emplace_back(readInOrder(std::move(parts_with_ranges), column_names, pool_settings, read_type, input_order_info->limit));
pipes.emplace_back(readInOrder(std::move(parts_with_ranges), column_names, pool_settings, read_type, input_order_info->limit, false));
}
else
{
@ -1124,8 +1139,34 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
splitted_parts_and_ranges.emplace_back(std::move(new_parts));
}
bool primary_key_type_supports_virtual_row = true;
const auto & actions = storage_snapshot->metadata->getPrimaryKey().expression->getActions();
for (const auto & action : actions)
{
if (action.node->type != ActionsDAG::ActionType::INPUT)
{
primary_key_type_supports_virtual_row = false;
break;
}
}
/// If possible in the optimization stage, check whether there are more than one branch.
if (virtual_row_status == VirtualRowStatus::Possible)
virtual_row_status = splitted_parts_and_ranges.size() > 1
|| (splitted_parts_and_ranges.size() == 1 && splitted_parts_and_ranges[0].size() > 1)
? VirtualRowStatus::Yes : VirtualRowStatus::NoConsiderInLogicalPlan;
for (auto && item : splitted_parts_and_ranges)
pipes.emplace_back(readInOrder(std::move(item), column_names, pool_settings, read_type, input_order_info->limit));
{
bool enable_current_virtual_row = false;
if (virtual_row_status == VirtualRowStatus::Yes)
enable_current_virtual_row = true;
else if (virtual_row_status == VirtualRowStatus::NoConsiderInLogicalPlan)
enable_current_virtual_row = (need_preliminary_merge || output_each_partition_through_separate_port) && item.size() > 1;
pipes.emplace_back(readInOrder(std::move(item), column_names, pool_settings, read_type, input_order_info->limit,
enable_current_virtual_row && primary_key_type_supports_virtual_row));
}
}
Block pipe_header;
@ -1811,7 +1852,7 @@ bool ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction,
if (direction != 1 && query_info.isFinal())
return false;
query_info.input_order_info = std::make_shared<InputOrderInfo>(SortDescription{}, prefix_size, direction, read_limit);
query_info.input_order_info = std::make_shared<InputOrderInfo>(SortDescription{}, prefix_size, direction, read_limit, false);
reader_settings.read_in_order = true;
/// In case or read-in-order, don't create too many reading streams.

View File

@ -108,6 +108,14 @@ public:
using AnalysisResultPtr = std::shared_ptr<AnalysisResult>;
enum class VirtualRowStatus
{
NoConsiderInLogicalPlan,
Possible,
No,
Yes,
};
ReadFromMergeTree(
MergeTreeData::DataPartsVector parts_,
MergeTreeData::MutationsSnapshotPtr mutations_snapshot_,
@ -210,6 +218,8 @@ public:
void applyFilters(ActionDAGNodes added_filter_nodes) override;
void setVirtualRowStatus(VirtualRowStatus virtual_row_status_) { virtual_row_status = virtual_row_status_; }
private:
int getSortDirection() const
{
@ -252,7 +262,7 @@ private:
Pipe read(RangesInDataParts parts_with_range, Names required_columns, ReadType read_type, size_t max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache);
Pipe readFromPool(RangesInDataParts parts_with_range, Names required_columns, PoolSettings pool_settings);
Pipe readFromPoolParallelReplicas(RangesInDataParts parts_with_range, Names required_columns, PoolSettings pool_settings);
Pipe readInOrder(RangesInDataParts parts_with_ranges, Names required_columns, PoolSettings pool_settings, ReadType read_type, UInt64 limit);
Pipe readInOrder(RangesInDataParts parts_with_ranges, Names required_columns, PoolSettings pool_settings, ReadType read_type, UInt64 limit, bool enable_current_virtual_row);
Pipe spreadMarkRanges(RangesInDataParts && parts_with_ranges, size_t num_streams, AnalysisResult & result, std::optional<ActionsDAG> & result_projection);
@ -282,6 +292,9 @@ private:
std::optional<MergeTreeReadTaskCallback> read_task_callback;
bool enable_vertical_final = false;
bool enable_remove_parts_from_snapshot_optimization = true;
VirtualRowStatus virtual_row_status = VirtualRowStatus::NoConsiderInLogicalPlan;
std::optional<size_t> number_of_current_replica;
};

View File

@ -0,0 +1,126 @@
#include <Processors/Transforms/VirtualRowTransform.h>
#include <Processors/Merges/Algorithms/MergeTreeReadInfo.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
VirtualRowTransform::VirtualRowTransform(const Block & header_,
const KeyDescription & primary_key_,
const IMergeTreeDataPart::Index & index_,
size_t mark_range_begin_)
: IProcessor({header_}, {header_})
, input(inputs.front()), output(outputs.front())
, header(header_), primary_key(primary_key_)
, index(index_), mark_range_begin(mark_range_begin_)
{
}
VirtualRowTransform::Status VirtualRowTransform::prepare()
{
/// Check can output.
if (output.isFinished())
{
input.close();
return Status::Finished;
}
if (!output.canPush())
{
input.setNotNeeded();
return Status::PortFull;
}
/// Output if has data.
if (generated)
{
output.push(std::move(current_chunk));
generated = false;
return Status::PortFull;
}
if (can_generate)
return Status::Ready;
/// Check can input.
if (!has_input)
{
if (input.isFinished())
{
output.finish();
return Status::Finished;
}
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
/// Set input port NotNeeded after chunk was pulled.
current_chunk = input.pull(true);
has_input = true;
}
/// Now transform.
return Status::Ready;
}
void VirtualRowTransform::work()
{
if (can_generate)
{
if (generated)
throw Exception(ErrorCodes::LOGICAL_ERROR, "VirtualRowTransform cannot consume chunk because it already was generated");
generated = true;
can_generate = false;
if (!is_first)
{
if (current_chunk.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't generate chunk in VirtualRowTransform");
return;
}
is_first = false;
/// Reorder the columns according to result_header
Columns ordered_columns;
ordered_columns.reserve(header.columns());
for (size_t i = 0, j = 0; i < header.columns(); ++i)
{
const ColumnWithTypeAndName & type_and_name = header.getByPosition(i);
ColumnPtr current_column = type_and_name.type->createColumn();
// ordered_columns.push_back(current_column->cloneResized(1));
if (j < index->size() && type_and_name.name == primary_key.column_names[j]
&& type_and_name.type == primary_key.data_types[j])
{
auto column = current_column->cloneEmpty();
column->insert((*(*index)[j])[mark_range_begin]);
ordered_columns.push_back(std::move(column));
++j;
}
else
ordered_columns.push_back(current_column->cloneResized(1));
}
current_chunk.setColumns(ordered_columns, 1);
current_chunk.getChunkInfos().add(std::make_shared<MergeTreeReadInfo>(0, true));
}
else
{
if (!has_input)
throw Exception(ErrorCodes::LOGICAL_ERROR, "VirtualRowTransform cannot consume chunk because it wasn't read");
has_input = false;
can_generate = true;
}
}
}

View File

@ -0,0 +1,42 @@
#pragma once
#include <Processors/IProcessor.h>
#include <Storages/KeyDescription.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
namespace DB
{
/// Virtual row is useful for read-in-order optimization when multiple parts exist.
class VirtualRowTransform : public IProcessor
{
public:
explicit VirtualRowTransform(const Block & header_,
const KeyDescription & primary_key_,
const IMergeTreeDataPart::Index & index_,
size_t mark_range_begin_);
String getName() const override { return "VirtualRowTransform"; }
Status prepare() override;
void work() override;
private:
InputPort & input;
OutputPort & output;
Chunk current_chunk;
bool has_input = false;
bool generated = false;
bool can_generate = true;
bool is_first = true;
Block header;
KeyDescription primary_key;
/// PK index used in virtual row.
IMergeTreeDataPart::Index index;
/// The first range that might contain the candidate.
size_t mark_range_begin;
};
}

View File

@ -6,7 +6,7 @@
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <Common/logger_useful.h>
#include <Common/typeid_cast.h>
#include <Processors/Merges/Algorithms/MergeTreePartLevelInfo.h>
#include <Processors/Merges/Algorithms/MergeTreeReadInfo.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeArray.h>
#include <Processors/Chunk.h>
@ -147,7 +147,7 @@ ChunkAndProgress MergeTreeSelectProcessor::read()
auto chunk = Chunk(ordered_columns, res.row_count);
if (add_part_level)
chunk.getChunkInfos().add(std::make_shared<MergeTreePartLevelInfo>(task->getInfo().data_part->info.level));
chunk.getChunkInfos().add(std::make_shared<MergeTreeReadInfo>(task->getInfo().data_part->info.level, false));
return ChunkAndProgress{
.chunk = std::move(chunk),

View File

@ -14,9 +14,10 @@
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Common/logger_useful.h>
#include <Processors/Merges/Algorithms/MergeTreePartLevelInfo.h>
#include <Processors/Merges/Algorithms/MergeTreeReadInfo.h>
#include <Storages/MergeTree/checkDataPart.h>
namespace DB
{
@ -266,7 +267,7 @@ try
auto result = Chunk(std::move(res_columns), rows_read);
if (add_part_level)
result.getChunkInfos().add(std::make_shared<MergeTreePartLevelInfo>(data_part->info.level));
result.getChunkInfos().add(std::make_shared<MergeTreeReadInfo>(data_part->info.level, false));
return result;
}
}

View File

@ -253,7 +253,7 @@ InputOrderInfoPtr ReadInOrderOptimizer::getInputOrderImpl(
if (sort_description_for_merging.empty())
return {};
return std::make_shared<InputOrderInfo>(std::move(sort_description_for_merging), key_pos, read_direction, limit);
return std::make_shared<InputOrderInfo>(std::move(sort_description_for_merging), key_pos, read_direction, limit, false);
}
InputOrderInfoPtr ReadInOrderOptimizer::getInputOrder(

View File

@ -119,13 +119,22 @@ struct InputOrderInfo
const int direction;
const UInt64 limit;
/** For virtual row optimization only
* for example, when pk is (a,b), a = 1, order by b, virtual row should be
* disabled in the following case:
* 1st part (0, 100), (1, 2), (1, 3), (1, 4)
* 2nd part (0, 100), (1, 2), (1, 3), (1, 4).
*/
bool first_prefix_fixed;
InputOrderInfo(
const SortDescription & sort_description_for_merging_,
size_t used_prefix_of_sorting_key_size_,
int direction_, UInt64 limit_)
int direction_, UInt64 limit_, bool first_prefix_fixed_)
: sort_description_for_merging(sort_description_for_merging_)
, used_prefix_of_sorting_key_size(used_prefix_of_sorting_key_size_)
, direction(direction_), limit(limit_)
, first_prefix_fixed(first_prefix_fixed_)
{
}

View File

@ -14,7 +14,10 @@ ExpressionTransform
(Expression)
ExpressionTransform × 2
(ReadFromMergeTree)
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
2020-10-01 9
2020-10-01 9
2020-10-01 9
@ -51,7 +54,10 @@ ExpressionTransform
(Expression)
ExpressionTransform × 2
(ReadFromMergeTree)
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
2020-10-11 0
2020-10-11 0
2020-10-11 0

View File

@ -160,52 +160,100 @@ ExpressionTransform × 16
(ReadFromMergeTree)
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
1000000
Skip merging: 1
Skip merging: 1

View File

@ -0,0 +1,42 @@
0
1
2
3
16384
========
16385
16386
16387
16388
24576
========
0
1
2
3
16384
========
16385
16386
16387
16388
24576
========
1 2
1 2
1 3
1 3
1 4
1 4
1 2
1 2
1 3
1 3
1 4
1 4
========
1 3
1 2
1 1
-- test distinct ----
0

View File

@ -0,0 +1,217 @@
DROP TABLE IF EXISTS t;
CREATE TABLE t
(
`x` UInt64,
`y` UInt64,
`z` UInt64,
`k` UInt64
)
ENGINE = MergeTree
ORDER BY (x, y, z)
SETTINGS index_granularity = 8192,
index_granularity_bytes = 10485760;
SYSTEM STOP MERGES t;
INSERT INTO t SELECT
number,
number,
number,
number
FROM numbers(8192 * 3);
INSERT INTO t SELECT
number + (8192 * 3),
number + (8192 * 3),
number + (8192 * 3),
number
FROM numbers(8192 * 3);
-- Expecting 2 virtual rows + one chunk (8192) for result + one extra chunk for next consumption in merge transform (8192),
-- both chunks come from the same part.
SELECT x
FROM t
ORDER BY x ASC
LIMIT 4
SETTINGS max_block_size = 8192,
read_in_order_two_level_merge_threshold = 0, --force preliminary merge
max_threads = 1,
optimize_read_in_order = 1,
log_comment = 'preliminary merge, no filter';
SYSTEM FLUSH LOGS;
SELECT read_rows
FROM system.query_log
WHERE current_database = currentDatabase()
AND log_comment = 'preliminary merge, no filter'
AND type = 'QueryFinish'
ORDER BY query_start_time DESC
limit 1;
SELECT '========';
-- Expecting 2 virtual rows + two chunks (8192*2) get filtered out + one chunk for result (8192),
-- all chunks come from the same part.
SELECT k
FROM t
WHERE k > 8192 * 2
ORDER BY x ASC
LIMIT 4
SETTINGS max_block_size = 8192,
read_in_order_two_level_merge_threshold = 0, --force preliminary merge
max_threads = 1,
optimize_read_in_order = 1,
log_comment = 'preliminary merge with filter';
SYSTEM FLUSH LOGS;
SELECT read_rows
FROM system.query_log
WHERE current_database = currentDatabase()
AND log_comment = 'preliminary merge with filter'
AND type = 'QueryFinish'
ORDER BY query_start_time DESC
LIMIT 1;
SELECT '========';
-- Expecting 2 virtual rows + one chunk (8192) for result + one extra chunk for next consumption in merge transform (8192),
-- both chunks come from the same part.
SELECT x
FROM t
ORDER BY x ASC
LIMIT 4
SETTINGS max_block_size = 8192,
read_in_order_two_level_merge_threshold = 5, --avoid preliminary merge
max_threads = 1,
optimize_read_in_order = 1,
log_comment = 'no preliminary merge, no filter';
SYSTEM FLUSH LOGS;
SELECT read_rows
FROM system.query_log
WHERE current_database = currentDatabase()
AND log_comment = 'no preliminary merge, no filter'
AND type = 'QueryFinish'
ORDER BY query_start_time DESC
LIMIT 1;
SELECT '========';
-- Expecting 2 virtual rows + two chunks (8192*2) get filtered out + one chunk for result (8192),
-- all chunks come from the same part.
SELECT k
FROM t
WHERE k > 8192 * 2
ORDER BY x ASC
LIMIT 4
SETTINGS max_block_size = 8192,
read_in_order_two_level_merge_threshold = 5, --avoid preliminary merge
max_threads = 1,
optimize_read_in_order = 1,
log_comment = 'no preliminary merge, with filter';
SYSTEM FLUSH LOGS;
SELECT read_rows
FROM system.query_log
WHERE current_database = currentDatabase()
AND log_comment = 'no preliminary merge, with filter'
AND type = 'QueryFinish'
ORDER BY query_start_time DESC
LIMIT 1;
DROP TABLE t;
SELECT '========';
-- from 02149_read_in_order_fixed_prefix
DROP TABLE IF EXISTS fixed_prefix;
CREATE TABLE fixed_prefix(a UInt32, b UInt32)
ENGINE = MergeTree ORDER BY (a, b)
SETTINGS index_granularity = 3;
SYSTEM STOP MERGES fixed_prefix;
INSERT INTO fixed_prefix VALUES (0, 100), (1, 2), (1, 3), (1, 4), (2, 5);
INSERT INTO fixed_prefix VALUES (0, 100), (1, 2), (1, 3), (1, 4), (2, 5);
SELECT a, b
FROM fixed_prefix
WHERE a = 1
ORDER BY b
SETTINGS max_threads = 1,
optimize_read_in_order = 1,
read_in_order_two_level_merge_threshold = 0; --force preliminary merge
SELECT a, b
FROM fixed_prefix
WHERE a = 1
ORDER BY b
SETTINGS max_threads = 1,
optimize_read_in_order = 1,
read_in_order_two_level_merge_threshold = 5; --avoid preliminary merge
DROP TABLE fixed_prefix;
SELECT '========';
-- currently don't support virtual row in this case
DROP TABLE IF EXISTS function_pk;
CREATE TABLE function_pk
(
`A` Int64,
`B` Int64
)
ENGINE = MergeTree ORDER BY (A, -B)
SETTINGS index_granularity = 1;
SYSTEM STOP MERGES function_pk;
INSERT INTO function_pk values(1,1);
INSERT INTO function_pk values(1,3);
INSERT INTO function_pk values(1,2);
SELECT *
FROM function_pk
ORDER BY (A,-B) ASC
limit 3
SETTINGS max_threads = 1,
optimize_read_in_order = 1,
read_in_order_two_level_merge_threshold = 0; --force preliminary merge
DROP TABLE function_pk;
-- modified from 02317_distinct_in_order_optimization
SELECT '-- test distinct ----';
DROP TABLE IF EXISTS distinct_in_order SYNC;
CREATE TABLE distinct_in_order
(
`a` int,
`b` int,
`c` int
)
ENGINE = MergeTree
ORDER BY (a, b)
SETTINGS index_granularity = 8192,
index_granularity_bytes = '10Mi';
SYSTEM STOP MERGES distinct_in_order;
INSERT INTO distinct_in_order SELECT
number % number,
number % 5,
number % 10
FROM numbers(1, 1000000);
SELECT DISTINCT a
FROM distinct_in_order
ORDER BY a ASC
SETTINGS read_in_order_two_level_merge_threshold = 0,
optimize_read_in_order = 1,
max_threads = 2;
DROP TABLE distinct_in_order;

View File

@ -0,0 +1,25 @@
(Expression)
ExpressionTransform
(Sorting)
MergingSortedTransform 4 → 1
(Expression)
ExpressionTransform × 4
(ReadFromMergeTree)
ExpressionTransform × 5
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
ExpressionTransform
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1

View File

@ -0,0 +1,26 @@
-- Tags: no-random-merge-tree-settings
SET optimize_read_in_order = 1, merge_tree_min_rows_for_concurrent_read = 1000;
DROP TABLE IF EXISTS tab;
CREATE TABLE tab
(
`t` DateTime
)
ENGINE = MergeTree
ORDER BY t
SETTINGS index_granularity = 1;
SYSTEM STOP MERGES tab;
INSERT INTO tab SELECT toDateTime('2024-01-10') + number FROM numbers(10000);
INSERT INTO tab SELECT toDateTime('2024-01-30') + number FROM numbers(10000);
INSERT INTO tab SELECT toDateTime('2024-01-20') + number FROM numbers(10000);
EXPLAIN PIPELINE
SELECT *
FROM tab
ORDER BY t ASC
SETTINGS read_in_order_two_level_merge_threshold = 0, max_threads = 4, read_in_order_use_buffering = 0
FORMAT tsv;