only focus on the direct mergesort case

This commit is contained in:
jsc0218 2024-09-10 01:31:01 +00:00
parent 26e74bc9ee
commit 4a67c68d0b
8 changed files with 83 additions and 119 deletions

View File

@ -392,7 +392,7 @@ Pipe ReadFromMergeTree::readFromPoolParallelReplicas(
auto algorithm = std::make_unique<MergeTreeThreadSelectAlgorithm>(i);
auto processor = std::make_unique<MergeTreeSelectProcessor>(
pool, std::move(algorithm), storage_snapshot, prewhere_info,
pool, std::move(algorithm), prewhere_info,
actions_settings, block_size_copy, reader_settings);
auto source = std::make_shared<MergeTreeSource>(std::move(processor), data.getLogName());
@ -491,7 +491,7 @@ Pipe ReadFromMergeTree::readFromPool(
auto algorithm = std::make_unique<MergeTreeThreadSelectAlgorithm>(i);
auto processor = std::make_unique<MergeTreeSelectProcessor>(
pool, std::move(algorithm), storage_snapshot, prewhere_info,
pool, std::move(algorithm), prewhere_info,
actions_settings, block_size_copy, reader_settings);
auto source = std::make_shared<MergeTreeSource>(std::move(processor), data.getLogName());
@ -610,7 +610,7 @@ Pipe ReadFromMergeTree::readInOrder(
algorithm = std::make_unique<MergeTreeInOrderSelectAlgorithm>(i);
auto processor = std::make_unique<MergeTreeSelectProcessor>(
pool, std::move(algorithm), storage_snapshot, prewhere_info,
pool, std::move(algorithm), prewhere_info,
actions_settings, block_size, reader_settings);
processor->addPartLevelToChunk(isQueryWithFinal());

View File

@ -6,7 +6,6 @@
#include <Processors/Transforms/LimitsCheckingTransform.h>
#include <Processors/Transforms/MergeSortingTransform.h>
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/Transforms/VirtualRowTransform.h>
#include <Processors/QueryPlan/BufferChunksTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Common/JSONBuilder.h>
@ -14,9 +13,6 @@
#include <Processors/ResizeProcessor.h>
#include <Processors/Transforms/ScatterByPartitionTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Storages/MergeTree/MergeTreeSource.h>
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
#include <memory>
@ -247,69 +243,11 @@ void SortingStep::finishSorting(
});
}
void SortingStep::enableVirtualRow(const QueryPipelineBuilder & pipeline) const
{
/// We check every step of this pipeline, to make sure virtual row can work correctly.
/// Currently ExpressionTransform is supported, should add other processors if possible.
const auto& pipe = pipeline.getPipe();
bool enable_virtual_row = true;
std::vector<std::shared_ptr<MergeTreeSource>> merge_tree_sources;
for (const auto & processor : pipe.getProcessors())
{
if (auto merge_tree_source = std::dynamic_pointer_cast<MergeTreeSource>(processor))
{
merge_tree_sources.push_back(merge_tree_source);
}
else if (!std::dynamic_pointer_cast<ExpressionTransform>(processor) && !std::dynamic_pointer_cast<VirtualRowTransform>(processor))
{
enable_virtual_row = false;
break;
}
}
/// If everything is okay, enable virtual row in MergeTreeSelectProcessor.
if (enable_virtual_row && merge_tree_sources.size() >= 2)
{
auto extractNameAfterDot = [](const String & name)
{
size_t pos = name.find_last_of('.');
return (pos != String::npos) ? name.substr(pos + 1) : name;
};
const ColumnWithTypeAndName & type_and_name = pipeline.getHeader().getByPosition(0);
String column_name = extractNameAfterDot(type_and_name.name);
for (const auto & merge_tree_source : merge_tree_sources)
{
const auto & merge_tree_select_processor = merge_tree_source->getProcessor();
/// Check pk is not func based, as we only check type and name in filling in primary key of virtual row.
const auto & primary_key = merge_tree_select_processor->getPrimaryKey();
const auto & actions = primary_key.expression->getActions();
bool is_okay = true;
for (const auto & action : actions)
{
if (action.node->type != ActionsDAG::ActionType::INPUT)
{
is_okay = false;
break;
}
}
/// We have to check further in the case of fixed prefix, for example,
/// primary key ab, query SELECT a, b FROM t WHERE a = 1 ORDER BY b,
/// merge sort would sort based on b, leading to wrong result in comparison.
if (is_okay && primary_key.column_names[0] == column_name && primary_key.data_types[0] == type_and_name.type)
merge_tree_select_processor->enableVirtualRow();
}
}
}
void SortingStep::mergingSorted(QueryPipelineBuilder & pipeline, const SortDescription & result_sort_desc, const UInt64 limit_)
{
/// If there are several streams, then we merge them into one
if (pipeline.getNumStreams() > 1)
{
if (use_buffering && sort_settings.read_in_order_use_buffering)
{
pipeline.addSimpleTransform([&](const Block & header)
@ -318,8 +256,6 @@ void SortingStep::mergingSorted(QueryPipelineBuilder & pipeline, const SortDescr
});
}
enableVirtualRow(pipeline);
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),

View File

@ -118,8 +118,6 @@ private:
UInt64 limit_,
bool skip_partial_sort = false);
void enableVirtualRow(const QueryPipelineBuilder & pipeline) const;
Type type;
SortDescription prefix_description;

View File

@ -197,8 +197,6 @@ public:
void setQueryIdHolder(std::shared_ptr<QueryIdHolder> query_id_holder) { resources.query_id_holders.emplace_back(std::move(query_id_holder)); }
void addContext(ContextPtr context) { resources.interpreter_context.emplace_back(std::move(context)); }
const Pipe& getPipe() const { return pipe; }
/// Convert query pipeline to pipe.
static Pipe getPipe(QueryPipelineBuilder pipeline, QueryPlanResourceHolder & resources);
static QueryPipeline getPipeline(QueryPipelineBuilder builder);

View File

@ -26,14 +26,12 @@ namespace ErrorCodes
MergeTreeSelectProcessor::MergeTreeSelectProcessor(
MergeTreeReadPoolPtr pool_,
MergeTreeSelectAlgorithmPtr algorithm_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
const ExpressionActionsSettings & actions_settings_,
const MergeTreeReadTask::BlockSizeParams & block_size_params_,
const MergeTreeReaderSettings & reader_settings_)
: pool(std::move(pool_))
, algorithm(std::move(algorithm_))
, storage_snapshot(storage_snapshot_)
, prewhere_info(prewhere_info_)
, actions_settings(actions_settings_)
, prewhere_actions(getPrewhereActions(prewhere_info, actions_settings, reader_settings_.enable_multiple_prewhere_read_steps))

View File

@ -36,7 +36,6 @@ public:
MergeTreeSelectProcessor(
MergeTreeReadPoolPtr pool_,
MergeTreeSelectAlgorithmPtr algorithm_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
const ExpressionActionsSettings & actions_settings_,
const MergeTreeReadTask::BlockSizeParams & block_size_params_,
@ -60,17 +59,12 @@ public:
void addPartLevelToChunk(bool add_part_level_) { add_part_level = add_part_level_; }
void enableVirtualRow() { enable_virtual_row = true; }
const KeyDescription & getPrimaryKey() const { return storage_snapshot->metadata->primary_key; }
private:
/// Sets up range readers corresponding to data readers
void initializeRangeReaders();
const MergeTreeReadPoolPtr pool;
const MergeTreeSelectAlgorithmPtr algorithm;
const StorageSnapshotPtr storage_snapshot;
const PrewhereInfoPtr prewhere_info;
const ExpressionActionsSettings actions_settings;
@ -89,12 +83,6 @@ private:
/// Should we add part level to produced chunk. Part level is useful for next steps if query has FINAL
bool add_part_level = false;
/// Should we add a virtual row as the single first chunk.
/// Virtual row is useful for read-in-order optimization when multiple parts exist.
bool enable_virtual_row = false;
/// PK index used in virtual row.
IMergeTreeDataPart::Index index;
LoggerPtr log = getLogger("MergeTreeSelectProcessor");
std::atomic<bool> is_cancelled{false};
};

View File

@ -19,8 +19,6 @@ public:
Status prepare() override;
const MergeTreeSelectProcessorPtr& getProcessor() const { return processor; }
#if defined(OS_LINUX)
int schedule() override;
#endif

View File

@ -160,52 +160,100 @@ ExpressionTransform × 16
(ReadFromMergeTree)
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
1000000
Skip merging: 1
Skip merging: 1