remove more projections code

This commit is contained in:
Anton Popov 2024-01-09 15:48:08 +00:00
parent 2fd0f7be7b
commit e0d469b0e0
11 changed files with 45 additions and 121 deletions

View File

@ -590,7 +590,7 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &
else if (!candidates.real.empty())
{
auto ordinary_reading_select_result = reading->selectRangesToRead(parts, alter_conversions);
size_t ordinary_reading_marks = ordinary_reading_select_result->marks();
size_t ordinary_reading_marks = ordinary_reading_select_result->selected_marks;
/// Nothing to read. Ignore projections.
if (ordinary_reading_marks == 0)
@ -599,7 +599,7 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &
return false;
}
const auto & parts_with_ranges = ordinary_reading_select_result->partsWithRanges();
const auto & parts_with_ranges = ordinary_reading_select_result->parts_with_ranges;
/// Selecting best candidate.
for (auto & candidate : candidates.real)

View File

@ -142,7 +142,7 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
MergeTreeDataSelectExecutor reader(reading->getMergeTreeData());
auto ordinary_reading_select_result = reading->selectRangesToRead(parts, alter_conversions);
size_t ordinary_reading_marks = ordinary_reading_select_result->marks();
size_t ordinary_reading_marks = ordinary_reading_select_result->selected_marks;
/// Nothing to read. Ignore projections.
if (ordinary_reading_marks == 0)
@ -151,7 +151,7 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
return false;
}
const auto & parts_with_ranges = ordinary_reading_select_result->partsWithRanges();
const auto & parts_with_ranges = ordinary_reading_select_result->parts_with_ranges;
std::shared_ptr<PartitionIdToMaxBlock> max_added_blocks = getMaxAddedBlocks(reading);

View File

@ -250,23 +250,17 @@ bool analyzeProjectionCandidate(
context->getSettingsRef().max_threads,
max_added_blocks);
if (projection_result_ptr->error())
return false;
candidate.merge_tree_projection_select_result_ptr = std::move(projection_result_ptr);
candidate.sum_marks += candidate.merge_tree_projection_select_result_ptr->marks();
candidate.sum_marks += candidate.merge_tree_projection_select_result_ptr->selected_marks;
if (!normal_parts.empty())
{
/// TODO: We can reuse existing analysis_result by filtering out projection parts
auto normal_result_ptr = reading.selectRangesToRead(std::move(normal_parts), std::move(alter_conversions));
if (normal_result_ptr->error())
return false;
if (normal_result_ptr->marks() != 0)
if (normal_result_ptr->selected_marks != 0)
{
candidate.sum_marks += normal_result_ptr->marks();
candidate.sum_marks += normal_result_ptr->selected_marks;
candidate.merge_tree_ordinary_select_result_ptr = std::move(normal_result_ptr);
}
}

View File

@ -1,31 +1,15 @@
#pragma once
#include <Interpreters/ActionsDAG.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
namespace DB
{
class ReadFromMergeTree;
using PartitionIdToMaxBlock = std::unordered_map<String, Int64>;
struct ProjectionDescription;
class MergeTreeDataSelectExecutor;
struct MergeTreeDataSelectAnalysisResult;
using MergeTreeDataSelectAnalysisResultPtr = std::shared_ptr<MergeTreeDataSelectAnalysisResult>;
class IMergeTreeDataPart;
using DataPartPtr = std::shared_ptr<const IMergeTreeDataPart>;
using DataPartsVector = std::vector<DataPartPtr>;
struct RangesInDataParts;
struct StorageInMemoryMetadata;
using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>;
struct SelectQueryInfo;
}
namespace DB::QueryPlanOptimizations
@ -61,8 +45,8 @@ struct ProjectionCandidate
/// Analysis result, separate for parts with and without projection.
/// Analysis is done in order to estimate the number of marks we are going to read.
/// For chosen projection, it is reused for reading step.
MergeTreeDataSelectAnalysisResultPtr merge_tree_projection_select_result_ptr;
MergeTreeDataSelectAnalysisResultPtr merge_tree_ordinary_select_result_ptr;
ReadFromMergeTree::AnalysisResultPtr merge_tree_projection_select_result_ptr;
ReadFromMergeTree::AnalysisResultPtr merge_tree_ordinary_select_result_ptr;
};
/// This function fills ProjectionCandidate structure for specified projection.

View File

@ -246,7 +246,7 @@ ReadFromMergeTree::ReadFromMergeTree(
bool sample_factor_column_queried_,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read_,
Poco::Logger * log_,
MergeTreeDataSelectAnalysisResultPtr analyzed_result_ptr_,
AnalysisResultPtr analyzed_result_ptr_,
bool enable_parallel_reading)
: SourceStepWithFilter(DataStream{.header = MergeTreeSelectProcessor::transformHeader(
storage_snapshot_->getSampleBlockForColumns(real_column_names_),
@ -1254,7 +1254,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
return merging_pipes.empty() ? Pipe::unitePipes(std::move(no_merging_pipes)) : Pipe::unitePipes(std::move(merging_pipes));
}
MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
MergeTreeData::DataPartsVector parts,
std::vector<AlterConversionsPtr> alter_conversions) const
{
@ -1454,7 +1454,7 @@ void ReadFromMergeTree::applyFilters()
buildIndexes(indexes, filter_actions_dag, data, prepared_parts, context, query_info, metadata_for_reading);
}
MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
MergeTreeData::DataPartsVector parts,
std::vector<AlterConversionsPtr> alter_conversions,
const PrewhereInfoPtr & prewhere_info,
@ -1490,7 +1490,7 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
indexes);
}
MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl(
ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl(
MergeTreeData::DataPartsVector parts,
std::vector<AlterConversionsPtr> alter_conversions,
const StorageMetadataPtr & metadata_snapshot_base,
@ -1527,27 +1527,26 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl(
buildIndexes(indexes, query_info.filter_actions_dag, data, parts, context, query_info, metadata_snapshot);
if (indexes->part_values && indexes->part_values->empty())
return std::make_shared<MergeTreeDataSelectAnalysisResult>(MergeTreeDataSelectAnalysisResult{.result = std::move(result)});
return std::make_shared<AnalysisResult>(std::move(result));
if (settings.force_primary_key && indexes->key_condition.alwaysUnknownOrTrue())
{
return std::make_shared<MergeTreeDataSelectAnalysisResult>(MergeTreeDataSelectAnalysisResult{
.result = std::make_exception_ptr(Exception(
ErrorCodes::INDEX_NOT_USED,
"Primary key ({}) is not used and setting 'force_primary_key' is set",
fmt::join(primary_key_column_names, ", ")))});
throw Exception(ErrorCodes::INDEX_NOT_USED,
"Primary key ({}) is not used and setting 'force_primary_key' is set",
fmt::join(primary_key_column_names, ", "));
}
LOG_DEBUG(log, "Key condition: {}", indexes->key_condition.toString());
if (indexes->part_offset_condition)
LOG_DEBUG(log, "Part offset condition: {}", indexes->part_offset_condition->toString());
if (indexes->key_condition.alwaysFalse())
return std::make_shared<MergeTreeDataSelectAnalysisResult>(MergeTreeDataSelectAnalysisResult{.result = std::move(result)});
return std::make_shared<AnalysisResult>(std::move(result));
size_t total_marks_pk = 0;
size_t parts_before_pk = 0;
try
{
MergeTreeDataSelectExecutor::filterPartsByPartition(
indexes->partition_pruner,
@ -1574,14 +1573,13 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl(
log);
if (result.sampling.read_nothing)
return std::make_shared<MergeTreeDataSelectAnalysisResult>(MergeTreeDataSelectAnalysisResult{.result = std::move(result)});
return std::make_shared<AnalysisResult>(std::move(result));
for (const auto & part : parts)
total_marks_pk += part->index_granularity.getMarksCountWithoutFinal();
parts_before_pk = parts.size();
auto reader_settings = getMergeTreeReaderSettings(context, query_info);
result.parts_with_ranges = MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipIndexes(
std::move(parts),
std::move(alter_conversions),
@ -1596,10 +1594,6 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl(
result.index_stats,
indexes->use_skip_indexes);
}
catch (...)
{
return std::make_shared<MergeTreeDataSelectAnalysisResult>(MergeTreeDataSelectAnalysisResult{.result = std::current_exception()});
}
size_t sum_marks_pk = total_marks_pk;
for (const auto & stat : result.index_stats)
@ -1631,7 +1625,7 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl(
? ReadType::InOrder
: ReadType::InReverseOrder;
return std::make_shared<MergeTreeDataSelectAnalysisResult>(MergeTreeDataSelectAnalysisResult{.result = std::move(result)});
return std::make_shared<AnalysisResult>(std::move(result));
}
bool ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction, size_t limit)
@ -1764,11 +1758,11 @@ bool ReadFromMergeTree::requestOutputEachPartitionThroughSeparatePort()
ReadFromMergeTree::AnalysisResult ReadFromMergeTree::getAnalysisResult() const
{
auto result_ptr = analyzed_result_ptr ? analyzed_result_ptr : selectRangesToRead(prepared_parts, alter_conversions_for_parts);
if (std::holds_alternative<std::exception_ptr>(result_ptr->result))
std::rethrow_exception(std::get<std::exception_ptr>(result_ptr->result));
auto result_ptr = analyzed_result_ptr
? analyzed_result_ptr
: selectRangesToRead(prepared_parts, alter_conversions_for_parts);
return std::get<AnalysisResult>(result_ptr->result);
return *result_ptr;
}
bool ReadFromMergeTree::isQueryWithFinal() const
@ -2235,33 +2229,5 @@ void ReadFromMergeTree::describeIndexes(JSONBuilder::JSONMap & map) const
}
}
bool MergeTreeDataSelectAnalysisResult::error() const
{
return std::holds_alternative<std::exception_ptr>(result);
}
size_t MergeTreeDataSelectAnalysisResult::marks() const
{
if (std::holds_alternative<std::exception_ptr>(result))
std::rethrow_exception(std::get<std::exception_ptr>(result));
return std::get<ReadFromMergeTree::AnalysisResult>(result).selected_marks;
}
UInt64 MergeTreeDataSelectAnalysisResult::rows() const
{
if (std::holds_alternative<std::exception_ptr>(result))
std::rethrow_exception(std::get<std::exception_ptr>(result));
return std::get<ReadFromMergeTree::AnalysisResult>(result).selected_rows;
}
const RangesInDataParts & MergeTreeDataSelectAnalysisResult::partsWithRanges() const
{
if (std::holds_alternative<std::exception_ptr>(result))
std::rethrow_exception(std::get<std::exception_ptr>(result));
return std::get<ReadFromMergeTree::AnalysisResult>(result).parts_with_ranges;
}
}

View File

@ -55,9 +55,6 @@ struct UsefulSkipIndexes
std::vector<MergedDataSkippingIndexAndCondition> merged_indices;
};
struct MergeTreeDataSelectAnalysisResult;
using MergeTreeDataSelectAnalysisResultPtr = std::shared_ptr<MergeTreeDataSelectAnalysisResult>;
/// This step is created to read from MergeTree* table.
/// For now, it takes a list of parts and creates source from it.
class ReadFromMergeTree final : public SourceStepWithFilter
@ -108,6 +105,8 @@ public:
void checkLimits(const Settings & settings, const SelectQueryInfo & query_info_) const;
};
using AnalysisResultPtr = std::shared_ptr<AnalysisResult>;
ReadFromMergeTree(
MergeTreeData::DataPartsVector parts_,
std::vector<AlterConversionsPtr> alter_conversions_,
@ -122,9 +121,8 @@ public:
bool sample_factor_column_queried_,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read_,
Poco::Logger * log_,
MergeTreeDataSelectAnalysisResultPtr analyzed_result_ptr_,
bool enable_parallel_reading
);
AnalysisResultPtr analyzed_result_ptr_,
bool enable_parallel_reading);
static constexpr auto name = "ReadFromMergeTree";
String getName() const override { return name; }
@ -157,7 +155,7 @@ public:
std::optional<std::unordered_set<String>> part_values;
};
static MergeTreeDataSelectAnalysisResultPtr selectRangesToRead(
static AnalysisResultPtr selectRangesToRead(
MergeTreeData::DataPartsVector parts,
std::vector<AlterConversionsPtr> alter_conversions,
const PrewhereInfoPtr & prewhere_info,
@ -174,7 +172,7 @@ public:
Poco::Logger * log,
std::optional<Indexes> & indexes);
MergeTreeDataSelectAnalysisResultPtr selectRangesToRead(
AnalysisResultPtr selectRangesToRead(
MergeTreeData::DataPartsVector parts,
std::vector<AlterConversionsPtr> alter_conversions) const;
@ -196,7 +194,7 @@ public:
bool willOutputEachPartitionThroughSeparatePort() const { return output_each_partition_through_separate_port; }
bool hasAnalyzedResult() const { return analyzed_result_ptr != nullptr; }
void setAnalyzedResult(MergeTreeDataSelectAnalysisResultPtr analyzed_result_ptr_) { analyzed_result_ptr = std::move(analyzed_result_ptr_); }
void setAnalyzedResult(AnalysisResultPtr analyzed_result_ptr_) { analyzed_result_ptr = std::move(analyzed_result_ptr_); }
const MergeTreeData::DataPartsVector & getParts() const { return prepared_parts; }
const std::vector<AlterConversionsPtr> & getAlterConvertionsForParts() const { return alter_conversions_for_parts; }
@ -209,7 +207,7 @@ public:
void applyFilters() override;
private:
static MergeTreeDataSelectAnalysisResultPtr selectRangesToReadImpl(
static AnalysisResultPtr selectRangesToReadImpl(
MergeTreeData::DataPartsVector parts,
std::vector<AlterConversionsPtr> alter_conversions,
const StorageMetadataPtr & metadata_snapshot_base,
@ -294,21 +292,11 @@ private:
RangesInDataParts && parts, size_t num_streams, const Names & origin_column_names, const Names & column_names, ActionsDAGPtr & out_projection);
ReadFromMergeTree::AnalysisResult getAnalysisResult() const;
MergeTreeDataSelectAnalysisResultPtr analyzed_result_ptr;
AnalysisResultPtr analyzed_result_ptr;
bool is_parallel_reading_from_replicas;
std::optional<MergeTreeAllRangesCallback> all_ranges_callback;
std::optional<MergeTreeReadTaskCallback> read_task_callback;
};
struct MergeTreeDataSelectAnalysisResult
{
std::variant<std::exception_ptr, ReadFromMergeTree::AnalysisResult> result;
bool error() const;
size_t marks() const;
UInt64 rows() const;
const RangesInDataParts & partsWithRanges() const;
};
}

View File

@ -6930,7 +6930,7 @@ UInt64 MergeTreeData::estimateNumberOfRowsToRead(
query_context,
query_context->getSettingsRef().max_threads);
UInt64 total_rows = result_ptr->rows();
UInt64 total_rows = result_ptr->selected_rows;
if (query_info.limit > 0 && query_info.limit < total_rows)
total_rows = query_info.limit;
return total_rows;

View File

@ -135,9 +135,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read,
bool enable_parallel_reading) const
{
if (query_info.merge_tree_empty_result)
return std::make_unique<QueryPlan>();
const auto & snapshot_data = assert_cast<const MergeTreeData::SnapshotData &>(*storage_snapshot->data);
const auto & parts = snapshot_data.parts;
const auto & alter_conversions = snapshot_data.alter_conversions;
@ -933,7 +930,7 @@ static void selectColumnNames(
}
}
MergeTreeDataSelectAnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMarksToRead(
ReadFromMergeTree::AnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMarksToRead(
MergeTreeData::DataPartsVector parts,
const PrewhereInfoPtr & prewhere_info,
const Names & column_names_to_return,
@ -947,8 +944,7 @@ MergeTreeDataSelectAnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMar
{
size_t total_parts = parts.size();
if (total_parts == 0)
return std::make_shared<MergeTreeDataSelectAnalysisResult>(
MergeTreeDataSelectAnalysisResult{.result = ReadFromMergeTree::AnalysisResult()});
return std::make_shared<ReadFromMergeTree::AnalysisResult>();
Names real_column_names;
Names virt_column_names;
@ -989,13 +985,13 @@ QueryPlanStepPtr MergeTreeDataSelectExecutor::readFromParts(
const UInt64 max_block_size,
const size_t num_streams,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read,
MergeTreeDataSelectAnalysisResultPtr merge_tree_select_result_ptr,
ReadFromMergeTree::AnalysisResultPtr merge_tree_select_result_ptr,
bool enable_parallel_reading) const
{
/// If merge_tree_select_result_ptr != nullptr, we use analyzed result so parts will always be empty.
if (merge_tree_select_result_ptr)
{
if (merge_tree_select_result_ptr->marks() == 0)
if (merge_tree_select_result_ptr->selected_marks == 0)
return {};
}
else if (parts.empty())

View File

@ -48,13 +48,13 @@ public:
UInt64 max_block_size,
size_t num_streams,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read = nullptr,
MergeTreeDataSelectAnalysisResultPtr merge_tree_select_result_ptr = nullptr,
ReadFromMergeTree::AnalysisResultPtr merge_tree_select_result_ptr = nullptr,
bool enable_parallel_reading = false) const;
/// Get an estimation for the number of marks we are going to read.
/// Reads nothing. Secondary indexes are not used.
/// This method is used to select best projection for table.
MergeTreeDataSelectAnalysisResultPtr estimateNumMarksToRead(
ReadFromMergeTree::AnalysisResultPtr estimateNumMarksToRead(
MergeTreeData::DataPartsVector parts,
const PrewhereInfoPtr & prewhere_info,
const Names & column_names,

View File

@ -37,7 +37,7 @@ public:
}
/// Used in queries with projection.
StorageFromMergeTreeDataPart(const MergeTreeData & storage_, MergeTreeDataSelectAnalysisResultPtr analysis_result_ptr_)
StorageFromMergeTreeDataPart(const MergeTreeData & storage_, ReadFromMergeTree::AnalysisResultPtr analysis_result_ptr_)
: IStorage(storage_.getStorageID()), storage(storage_), analysis_result_ptr(analysis_result_ptr_)
{
setInMemoryMetadata(storage.getInMemoryMetadata());
@ -127,7 +127,7 @@ private:
const std::vector<AlterConversionsPtr> alter_conversions;
const MergeTreeData & storage;
const String partition_id;
const MergeTreeDataSelectAnalysisResultPtr analysis_result_ptr;
const ReadFromMergeTree::AnalysisResultPtr analysis_result_ptr;
static StorageID getIDFromPart(const MergeTreeData::DataPartPtr & part_)
{

View File

@ -42,9 +42,6 @@ using ReadInOrderOptimizerPtr = std::shared_ptr<const ReadInOrderOptimizer>;
class Cluster;
using ClusterPtr = std::shared_ptr<Cluster>;
struct MergeTreeDataSelectAnalysisResult;
using MergeTreeDataSelectAnalysisResultPtr = std::shared_ptr<MergeTreeDataSelectAnalysisResult>;
struct PrewhereInfo
{
/// Actions for row level security filter. Applied separately before prewhere_actions.
@ -214,7 +211,6 @@ struct SelectQueryInfo
ClusterPtr getCluster() const { return !optimized_cluster ? cluster : optimized_cluster; }
bool merge_tree_empty_result = false;
bool settings_limit_offset_done = false;
bool is_internal = false;
bool parallel_replicas_disabled = false;