Improve projection analysis.

Remove duplicate index analysis and avoid possible invalid limit checks
during projection analysis.
This commit is contained in:
Amos Bird 2021-08-16 20:09:18 +08:00
parent c4a14ffca2
commit b162a2b699
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
7 changed files with 250 additions and 148 deletions

View File

@ -40,18 +40,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
struct ReadFromMergeTree::AnalysisResult
{
RangesInDataParts parts_with_ranges;
MergeTreeDataSelectSamplingData sampling;
IndexStats index_stats;
Names column_names_to_read;
ReadFromMergeTree::ReadType read_type = ReadFromMergeTree::ReadType::Default;
UInt64 selected_rows = 0;
UInt64 selected_marks = 0;
UInt64 selected_parts = 0;
};
static MergeTreeReaderSettings getMergeTreeReaderSettings(const ContextPtr & context)
{
const auto & settings = context->getSettingsRef();
@ -84,7 +72,8 @@ ReadFromMergeTree::ReadFromMergeTree(
size_t num_streams_,
bool sample_factor_column_queried_,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read_,
Poco::Logger * log_)
Poco::Logger * log_,
MergeTreeDataSelectAnalysisResultPtr analysis_result_ptr)
: ISourceStep(DataStream{.header = MergeTreeBaseSelectProcessor::transformHeader(
metadata_snapshot_->getSampleBlockForColumns(real_column_names_, data_.getVirtuals(), data_.getStorageID()),
getPrewhereInfo(query_info_),
@ -116,6 +105,10 @@ ReadFromMergeTree::ReadFromMergeTree(
auto type = std::make_shared<DataTypeFloat64>();
output_stream->header.insert({type->createColumn(), type, "_sample_factor"});
}
/// If we have analyzed result, reuse it for future planing.
if (analysis_result_ptr)
analyzed_result = analysis_result_ptr->result;
}
Pipe ReadFromMergeTree::readFromPool(
@ -780,6 +773,33 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
}
ReadFromMergeTree::AnalysisResult ReadFromMergeTree::selectRangesToRead(MergeTreeData::DataPartsVector parts) const
{
return selectRangesToRead(
std::move(parts),
metadata_snapshot_base,
metadata_snapshot,
query_info,
context,
requested_num_streams,
max_block_numbers_to_read,
data,
real_column_names,
sample_factor_column_queried,
log);
}
ReadFromMergeTree::AnalysisResult ReadFromMergeTree::selectRangesToRead(
MergeTreeData::DataPartsVector parts,
const StorageMetadataPtr & metadata_snapshot_base,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
ContextPtr context,
unsigned num_streams,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read,
const MergeTreeData & data,
const Names & real_column_names,
bool sample_factor_column_queried,
Poco::Logger * log)
{
AnalysisResult result;
const auto & settings = context->getSettingsRef();
@ -808,10 +828,10 @@ ReadFromMergeTree::AnalysisResult ReadFromMergeTree::selectRangesToRead(MergeTre
if (settings.force_primary_key && key_condition.alwaysUnknownOrTrue())
{
throw Exception(
ErrorCodes::INDEX_NOT_USED,
"Primary key ({}) is not used and setting 'force_primary_key' is set.",
fmt::join(primary_key_columns, ", "));
result.error_msg
= fmt::format("Primary key ({}) is not used and setting 'force_primary_key' is set.", fmt::join(primary_key_columns, ", "));
result.error_code = ErrorCodes::INDEX_NOT_USED;
return result;
}
LOG_DEBUG(log, "Key condition: {}", key_condition.toString());
@ -819,11 +839,30 @@ ReadFromMergeTree::AnalysisResult ReadFromMergeTree::selectRangesToRead(MergeTre
MergeTreeDataSelectExecutor::filterPartsByPartition(
parts, part_values, metadata_snapshot_base, data, query_info, context,
max_block_numbers_to_read.get(), log, result.index_stats);
max_block_numbers_to_read.get(), log, result);
result.sampling = MergeTreeDataSelectExecutor::getSampling(
select, metadata_snapshot->getColumns().getAllPhysical(), parts, key_condition,
data, metadata_snapshot, context, sample_factor_column_queried, log);
if (result.error_code)
return result;
try
{
result.sampling = MergeTreeDataSelectExecutor::getSampling(
select,
metadata_snapshot->getColumns().getAllPhysical(),
parts,
key_condition,
data,
metadata_snapshot,
context,
sample_factor_column_queried,
log);
}
catch (Exception & e)
{
result.error_code = e.code();
result.error_msg = e.message();
return result;
}
if (result.sampling.read_nothing)
return result;
@ -834,18 +873,27 @@ ReadFromMergeTree::AnalysisResult ReadFromMergeTree::selectRangesToRead(MergeTre
size_t parts_before_pk = parts.size();
result.parts_with_ranges = MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipIndexes(
std::move(parts),
metadata_snapshot,
query_info,
context,
key_condition,
reader_settings,
log,
requested_num_streams,
result.index_stats,
true /* use_skip_indexes */,
true /* check_limits */);
try
{
auto reader_settings = getMergeTreeReaderSettings(context);
result.parts_with_ranges = MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipIndexes(
std::move(parts),
metadata_snapshot,
query_info,
context,
key_condition,
reader_settings,
log,
num_streams,
result.index_stats,
true /* use_skip_indexes */);
}
catch (Exception & e)
{
result.error_code = e.code();
result.error_msg = e.message();
return result;
}
size_t sum_marks_pk = total_marks_pk;
for (const auto & stat : result.index_stats)
@ -862,23 +910,15 @@ ReadFromMergeTree::AnalysisResult ReadFromMergeTree::selectRangesToRead(MergeTre
sum_marks += part.getMarksCount();
sum_rows += part.getRowsCount();
}
result.selected_parts = result.parts_with_ranges.size();
result.selected_marks = sum_marks;
result.selected_rows = sum_rows;
LOG_DEBUG(
log,
"Selected {}/{} parts by partition key, {} parts by primary key, {}/{} marks by primary key, {} marks to read from {} ranges",
parts_before_pk,
total_parts,
result.parts_with_ranges.size(),
sum_marks_pk,
total_marks_pk,
sum_marks,
sum_ranges);
ProfileEvents::increment(ProfileEvents::SelectedParts, result.parts_with_ranges.size());
ProfileEvents::increment(ProfileEvents::SelectedRanges, sum_ranges);
ProfileEvents::increment(ProfileEvents::SelectedMarks, sum_marks);
result.total_parts = total_parts;
result.parts_before_pk = parts_before_pk;
result.selected_parts = result.parts_with_ranges.size();
result.selected_ranges = sum_ranges;
result.selected_marks = sum_marks;
result.selected_marks_pk = sum_marks_pk;
result.total_marks_pk = total_marks_pk;
result.selected_rows = sum_rows;
const auto & input_order_info = query_info.input_order_info
? query_info.input_order_info
@ -893,7 +933,26 @@ ReadFromMergeTree::AnalysisResult ReadFromMergeTree::selectRangesToRead(MergeTre
void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &)
{
auto result = selectRangesToRead(prepared_parts);
auto result = analyzed_result.is_analyzed ? std::move(analyzed_result) : selectRangesToRead(prepared_parts);
if (result.error_code)
throw Exception(result.error_msg, result.error_code);
LOG_DEBUG(
log,
"Selected {}/{} parts by partition key, {} parts by primary key, {}/{} marks by primary key, {} marks to read from {} ranges",
result.parts_before_pk,
result.total_parts,
result.selected_parts,
result.selected_marks_pk,
result.total_marks_pk,
result.selected_marks,
result.selected_ranges);
ProfileEvents::increment(ProfileEvents::SelectedParts, result.selected_parts);
ProfileEvents::increment(ProfileEvents::SelectedRanges, result.selected_ranges);
ProfileEvents::increment(ProfileEvents::SelectedMarks, result.selected_marks);
auto query_id_holder = MergeTreeDataSelectExecutor::checkLimits(data, result.parts_with_ranges, context);
if (result.parts_with_ranges.empty())
@ -1084,7 +1143,7 @@ static const char * readTypeToString(ReadFromMergeTree::ReadType type)
void ReadFromMergeTree::describeActions(FormatSettings & format_settings) const
{
auto result = selectRangesToRead(prepared_parts);
auto result = analyzed_result.is_analyzed ? std::move(analyzed_result) : selectRangesToRead(prepared_parts);
std::string prefix(format_settings.offset, format_settings.indent_char);
format_settings.out << prefix << "ReadType: " << readTypeToString(result.read_type) << '\n';
@ -1097,7 +1156,7 @@ void ReadFromMergeTree::describeActions(FormatSettings & format_settings) const
void ReadFromMergeTree::describeActions(JSONBuilder::JSONMap & map) const
{
auto result = selectRangesToRead(prepared_parts);
auto result = analyzed_result.is_analyzed ? std::move(analyzed_result) : selectRangesToRead(prepared_parts);
map.add("Read Type", readTypeToString(result.read_type));
if (!result.index_stats.empty())
{
@ -1108,7 +1167,7 @@ void ReadFromMergeTree::describeActions(JSONBuilder::JSONMap & map) const
void ReadFromMergeTree::describeIndexes(FormatSettings & format_settings) const
{
auto result = selectRangesToRead(prepared_parts);
auto result = analyzed_result.is_analyzed ? std::move(analyzed_result) : selectRangesToRead(prepared_parts);
auto index_stats = std::move(result.index_stats);
std::string prefix(format_settings.offset, format_settings.indent_char);
@ -1160,7 +1219,7 @@ void ReadFromMergeTree::describeIndexes(FormatSettings & format_settings) const
void ReadFromMergeTree::describeIndexes(JSONBuilder::JSONMap & map) const
{
auto result = selectRangesToRead(prepared_parts);
auto result = analyzed_result.is_analyzed ? std::move(analyzed_result) : selectRangesToRead(prepared_parts);
auto index_stats = std::move(result.index_stats);
if (!index_stats.empty())

View File

@ -9,6 +9,18 @@ using PartitionIdToMaxBlock = std::unordered_map<String, Int64>;
class Pipe;
struct MergeTreeDataSelectSamplingData
{
bool use_sampling = false;
bool read_nothing = false;
Float64 used_sample_factor = 1.0;
std::shared_ptr<ASTFunction> filter_function;
ActionsDAGPtr filter_expression;
};
struct MergeTreeDataSelectAnalysisResult;
using MergeTreeDataSelectAnalysisResultPtr = std::shared_ptr<MergeTreeDataSelectAnalysisResult>;
/// This step is created to read from MergeTree* table.
/// For now, it takes a list of parts and creates source from it.
class ReadFromMergeTree final : public ISourceStep
@ -54,6 +66,28 @@ public:
InReverseOrder,
};
struct AnalysisResult
{
RangesInDataParts parts_with_ranges;
MergeTreeDataSelectSamplingData sampling;
IndexStats index_stats;
Names column_names_to_read;
ReadFromMergeTree::ReadType read_type = ReadFromMergeTree::ReadType::Default;
UInt64 total_parts = 0;
UInt64 parts_before_pk = 0;
UInt64 selected_parts = 0;
UInt64 selected_ranges = 0;
UInt64 selected_marks = 0;
UInt64 selected_marks_pk = 0;
UInt64 total_marks_pk = 0;
UInt64 selected_rows = 0;
bool is_analyzed = false;
// If error_code is not zero, throw error during initializePipeline.
int error_code = 0;
String error_msg;
};
ReadFromMergeTree(
MergeTreeData::DataPartsVector parts_,
Names real_column_names_,
@ -67,7 +101,8 @@ public:
size_t num_streams_,
bool sample_factor_column_queried_,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read_,
Poco::Logger * log_
Poco::Logger * log_,
MergeTreeDataSelectAnalysisResultPtr analysis_result_ptr
);
String getName() const override { return "ReadFromMergeTree"; }
@ -84,6 +119,20 @@ public:
UInt64 getSelectedParts() const { return selected_parts; }
UInt64 getSelectedRows() const { return selected_rows; }
UInt64 getSelectedMarks() const { return selected_marks; }
static ReadFromMergeTree::AnalysisResult selectRangesToRead(
MergeTreeData::DataPartsVector parts,
const StorageMetadataPtr & metadata_snapshot_base,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
ContextPtr context,
unsigned num_streams,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read,
const MergeTreeData & data,
const Names & real_column_names,
bool sample_factor_column_queried,
Poco::Logger * log);
private:
const MergeTreeReaderSettings reader_settings;
@ -137,8 +186,14 @@ private:
const Names & column_names,
ActionsDAGPtr & out_projection);
struct AnalysisResult;
AnalysisResult selectRangesToRead(MergeTreeData::DataPartsVector parts) const;
ReadFromMergeTree::AnalysisResult selectRangesToRead(MergeTreeData::DataPartsVector parts) const;
AnalysisResult analyzed_result;
};
// For forward declaration.
struct MergeTreeDataSelectAnalysisResult
{
ReadFromMergeTree::AnalysisResult result;
};
}

View File

@ -51,6 +51,7 @@
#include <Common/escapeForFileName.h>
#include <Common/quoteString.h>
#include <Common/typeid_cast.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <boost/range/adaptor/filtered.hpp>
#include <boost/algorithm/string/join.hpp>
@ -3940,7 +3941,7 @@ static void selectBestProjection(
if (projection_parts.empty())
return;
auto sum_marks = reader.estimateNumMarksToRead(
auto projection_result = reader.estimateNumMarksToRead(
projection_parts,
candidate.required_columns,
metadata_snapshot,
@ -3950,6 +3951,10 @@ static void selectBestProjection(
settings.max_threads,
max_added_blocks);
if (projection_result.error_code)
return;
auto sum_marks = projection_result.index_stats.back().num_granules_after;
if (normal_parts.empty())
{
// All parts are projection parts which allows us to use in_order_optimization.
@ -3958,7 +3963,7 @@ static void selectBestProjection(
}
else
{
sum_marks += reader.estimateNumMarksToRead(
auto normal_result = reader.estimateNumMarksToRead(
normal_parts,
required_columns,
metadata_snapshot,
@ -3967,7 +3972,16 @@ static void selectBestProjection(
query_context,
settings.max_threads,
max_added_blocks);
if (normal_result.error_code)
return;
sum_marks += normal_result.index_stats.back().num_granules_after;
candidate.merge_tree_normal_select_result_ptr
= std::make_shared<MergeTreeDataSelectAnalysisResult>(MergeTreeDataSelectAnalysisResult{.result = std::move(normal_result)});
}
candidate.merge_tree_projection_select_result_ptr
= std::make_shared<MergeTreeDataSelectAnalysisResult>(MergeTreeDataSelectAnalysisResult{.result = std::move(projection_result)});
// We choose the projection with least sum_marks to read.
if (sum_marks < min_sum_marks)
@ -4217,7 +4231,7 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
/// Select the best normal projection if no aggregate projection is available
if (!selected_candidate && has_ordinary_projection)
{
min_sum_marks = reader.estimateNumMarksToRead(
auto result = reader.estimateNumMarksToRead(
parts,
analysis_result.required_columns,
metadata_snapshot,
@ -4229,7 +4243,7 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
// Add 1 to base sum_marks so that we prefer projections even when they have equal number of marks to read.
// NOTE: It is not clear if we need it. E.g. projections do not support skip index for now.
min_sum_marks += 1;
min_sum_marks = result.index_stats.back().num_granules_after + 1;
for (auto & candidate : candidates)
{
@ -4249,6 +4263,14 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
min_sum_marks);
}
}
if (!selected_candidate)
{
// We don't have any good projections, result the MergeTreeDataSelectAnalysisResult for normal scan.
query_info.merge_tree_select_result_ptr = std::make_shared<MergeTreeDataSelectAnalysisResult>(
MergeTreeDataSelectAnalysisResult{.result = std::move(result)});
return false;
}
}
if (!selected_candidate)
@ -4261,7 +4283,6 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
}
query_info.projection = std::move(*selected_candidate);
return true;
}
return false;

View File

@ -145,7 +145,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
context,
max_block_size,
num_streams,
max_block_numbers_to_read);
max_block_numbers_to_read,
query_info.merge_tree_select_result_ptr);
if (plan->isInitialized() && settings.allow_experimental_projection_optimization && settings.force_optimize_projection
&& !metadata_snapshot->projections.empty())
@ -190,7 +191,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
context,
max_block_size,
num_streams,
max_block_numbers_to_read);
max_block_numbers_to_read,
query_info.projection->merge_tree_projection_select_result_ptr);
if (plan)
{
@ -224,7 +226,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
if (!normal_parts.empty())
{
auto storage_from_base_parts_of_projection = StorageFromMergeTreeDataPart::create(std::move(normal_parts));
auto storage_from_base_parts_of_projection
= StorageFromMergeTreeDataPart::create(std::move(normal_parts), query_info.projection->merge_tree_normal_select_result_ptr);
auto interpreter = InterpreterSelectQuery(
query_info.query,
context,
@ -666,7 +669,7 @@ void MergeTreeDataSelectExecutor::filterPartsByPartition(
const ContextPtr & context,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
Poco::Logger * log,
ReadFromMergeTree::IndexStats & index_stats)
ReadFromMergeTree::AnalysisResult & result)
{
const Settings & settings = context->getSettingsRef();
std::optional<PartitionPruner> partition_pruner;
@ -696,7 +699,9 @@ void MergeTreeDataSelectExecutor::filterPartsByPartition(
}
msg += ") nor partition expr is used and setting 'force_index_by_date' is set";
throw Exception(msg, ErrorCodes::INDEX_NOT_USED);
result.error_msg = msg;
result.error_code = ErrorCodes::INDEX_NOT_USED;
return;
}
}
@ -724,7 +729,7 @@ void MergeTreeDataSelectExecutor::filterPartsByPartition(
max_block_numbers_to_read,
part_filter_counters);
index_stats.emplace_back(ReadFromMergeTree::IndexStat{
result.index_stats.emplace_back(ReadFromMergeTree::IndexStat{
.type = ReadFromMergeTree::IndexType::None,
.num_parts_after = part_filter_counters.num_initial_selected_parts,
.num_granules_after = part_filter_counters.num_initial_selected_granules});
@ -732,7 +737,7 @@ void MergeTreeDataSelectExecutor::filterPartsByPartition(
if (minmax_idx_condition)
{
auto description = minmax_idx_condition->getDescription();
index_stats.emplace_back(ReadFromMergeTree::IndexStat{
result.index_stats.emplace_back(ReadFromMergeTree::IndexStat{
.type = ReadFromMergeTree::IndexType::MinMax,
.condition = std::move(description.condition),
.used_keys = std::move(description.used_keys),
@ -744,7 +749,7 @@ void MergeTreeDataSelectExecutor::filterPartsByPartition(
if (partition_pruner)
{
auto description = partition_pruner->getKeyCondition().getDescription();
index_stats.emplace_back(ReadFromMergeTree::IndexStat{
result.index_stats.emplace_back(ReadFromMergeTree::IndexStat{
.type = ReadFromMergeTree::IndexType::Partition,
.condition = std::move(description.condition),
.used_keys = std::move(description.used_keys),
@ -763,8 +768,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
Poco::Logger * log,
size_t num_streams,
ReadFromMergeTree::IndexStats & index_stats,
bool use_skip_indexes,
bool check_limits)
bool use_skip_indexes)
{
RangesInDataParts parts_with_ranges(parts.size());
const Settings & settings = context->getSettingsRef();
@ -892,7 +896,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
if (!ranges.ranges.empty())
{
if (check_limits && (limits.max_rows || leaf_limits.max_rows))
if (limits.max_rows || leaf_limits.max_rows)
{
/// Fail fast if estimated number of rows to read exceeds the limit
auto current_rows_estimate = ranges.getRowsCount();
@ -1082,7 +1086,7 @@ static void selectColumnNames(
}
}
size_t MergeTreeDataSelectExecutor::estimateNumMarksToRead(
ReadFromMergeTree::AnalysisResult MergeTreeDataSelectExecutor::estimateNumMarksToRead(
MergeTreeData::DataPartsVector parts,
const Names & column_names_to_return,
const StorageMetadataPtr & metadata_snapshot_base,
@ -1094,7 +1098,11 @@ size_t MergeTreeDataSelectExecutor::estimateNumMarksToRead(
{
size_t total_parts = parts.size();
if (total_parts == 0)
return 0;
{
ReadFromMergeTree::AnalysisResult result;
result.is_analyzed = true;
return result;
}
Names real_column_names;
Names virt_column_names;
@ -1104,63 +1112,18 @@ size_t MergeTreeDataSelectExecutor::estimateNumMarksToRead(
selectColumnNames(column_names_to_return, data, real_column_names, virt_column_names, sample_factor_column_queried);
auto part_values = filterPartsByVirtualColumns(data, parts, query_info.query, context);
if (part_values && part_values->empty())
return 0;
/// If there are only virtual columns in the query, you must request at least one non-virtual one.
if (real_column_names.empty())
{
NamesAndTypesList available_real_columns = metadata_snapshot->getColumns().getAllPhysical();
real_column_names.push_back(ExpressionActions::getSmallestColumn(available_real_columns));
}
metadata_snapshot->check(real_column_names, data.getVirtuals(), data.getStorageID());
const auto & primary_key = metadata_snapshot->getPrimaryKey();
Names primary_key_columns = primary_key.column_names;
KeyCondition key_condition(query_info, context, primary_key_columns, primary_key.expression);
if (key_condition.alwaysUnknownOrTrue())
{
size_t total_marks = 0;
for (const auto & part : parts)
total_marks += part->index_granularity.getMarksCountWithoutFinal();
return total_marks;
}
const auto & select = query_info.query->as<ASTSelectQuery &>();
ReadFromMergeTree::IndexStats index_stats;
filterPartsByPartition(
parts, part_values, metadata_snapshot_base, data, query_info,
context, max_block_numbers_to_read.get(), log, index_stats);
auto sampling = MergeTreeDataSelectExecutor::getSampling(
select, metadata_snapshot->getColumns().getAllPhysical(), parts, key_condition,
data, metadata_snapshot, context, sample_factor_column_queried, log);
if (sampling.read_nothing)
return 0;
/// Do not init. It is not used (cause skip index is ignored)
MergeTreeReaderSettings reader_settings;
auto parts_with_ranges = filterPartsByPrimaryKeyAndSkipIndexes(
return ReadFromMergeTree::selectRangesToRead(
std::move(parts),
metadata_snapshot_base,
metadata_snapshot,
query_info,
context,
key_condition,
reader_settings,
log,
num_streams,
index_stats,
true /* use_skip_indexes */,
false /* check_limits */);
return index_stats.back().num_granules_after;
max_block_numbers_to_read,
data,
real_column_names,
sample_factor_column_queried,
log);
}
QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
@ -1172,7 +1135,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
ContextPtr context,
const UInt64 max_block_size,
const unsigned num_streams,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read) const
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read,
MergeTreeDataSelectAnalysisResultPtr merge_tree_select_result_ptr) const
{
size_t total_parts = parts.size();
if (total_parts == 0)
@ -1187,7 +1151,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
selectColumnNames(column_names_to_return, data, real_column_names, virt_column_names, sample_factor_column_queried);
auto read_from_merge_tree = std::make_unique<ReadFromMergeTree>(
parts,
std::move(parts),
real_column_names,
virt_column_names,
data,
@ -1199,7 +1163,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
num_streams,
sample_factor_column_queried,
max_block_numbers_to_read,
log
log,
merge_tree_select_result_ptr
);
QueryPlanPtr plan = std::make_unique<QueryPlan>();

View File

@ -13,15 +13,6 @@ namespace DB
class KeyCondition;
struct MergeTreeDataSelectSamplingData
{
bool use_sampling = false;
bool read_nothing = false;
Float64 used_sample_factor = 1.0;
std::shared_ptr<ASTFunction> filter_function;
ActionsDAGPtr filter_expression;
};
using PartitionIdToMaxBlock = std::unordered_map<String, Int64>;
/** Executes SELECT queries on data from the merge tree.
@ -55,12 +46,13 @@ public:
ContextPtr context,
UInt64 max_block_size,
unsigned num_streams,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read = nullptr) const;
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read = nullptr,
MergeTreeDataSelectAnalysisResultPtr analysis_result_ptr = nullptr) const;
/// Get an estimation for the number of marks we are going to read.
/// Reads nothing. Secondary indexes are not used.
/// This method is used to select best projection for table.
size_t estimateNumMarksToRead(
ReadFromMergeTree::AnalysisResult estimateNumMarksToRead(
MergeTreeData::DataPartsVector parts,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot_base,
@ -100,6 +92,8 @@ private:
size_t & granules_dropped,
Poco::Logger * log);
friend class ReadFromMergeTree;
struct PartFilterCounters
{
size_t num_initial_selected_parts = 0;
@ -170,7 +164,7 @@ public:
const ContextPtr & context,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
Poco::Logger * log,
ReadFromMergeTree::IndexStats & index_stats);
ReadFromMergeTree::AnalysisResult & result);
/// Filter parts using primary key and secondary indexes.
/// For every part, select mark ranges to read.
@ -185,8 +179,7 @@ public:
Poco::Logger * log,
size_t num_streams,
ReadFromMergeTree::IndexStats & index_stats,
bool use_skip_indexes,
bool check_limits);
bool use_skip_indexes);
/// Create expression for sampling.
/// Also, calculate _sample_factor if needed.

View File

@ -41,7 +41,9 @@ public:
query_info,
context,
max_block_size,
num_streams));
num_streams,
nullptr,
analysis_result_ptr));
return query_plan.convertToPipe(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
@ -80,15 +82,16 @@ protected:
setInMemoryMetadata(part_->storage.getInMemoryMetadata());
}
StorageFromMergeTreeDataPart(MergeTreeData::DataPartsVector && parts_)
: IStorage(getIDFromParts(parts_))
, parts(std::move(parts_))
StorageFromMergeTreeDataPart(
MergeTreeData::DataPartsVector && parts_, MergeTreeDataSelectAnalysisResultPtr analysis_result_ptr_ = nullptr)
: IStorage(getIDFromParts(parts_)), parts(std::move(parts_)), analysis_result_ptr(analysis_result_ptr_)
{
setInMemoryMetadata(parts.front()->storage.getInMemoryMetadata());
}
private:
MergeTreeData::DataPartsVector parts;
MergeTreeDataSelectAnalysisResultPtr analysis_result_ptr;
static StorageID getIDFromPart(const MergeTreeData::DataPartPtr & part_)
{

View File

@ -39,6 +39,9 @@ using ReadInOrderOptimizerPtr = std::shared_ptr<const ReadInOrderOptimizer>;
class Cluster;
using ClusterPtr = std::shared_ptr<Cluster>;
struct MergeTreeDataSelectAnalysisResult;
using MergeTreeDataSelectAnalysisResultPtr = std::shared_ptr<MergeTreeDataSelectAnalysisResult>;
struct PrewhereInfo
{
/// Actions which are executed in order to alias columns are used for prewhere actions.
@ -118,6 +121,8 @@ struct ProjectionCandidate
ReadInOrderOptimizerPtr order_optimizer;
InputOrderInfoPtr input_order_info;
ManyExpressionActions group_by_elements_actions;
MergeTreeDataSelectAnalysisResultPtr merge_tree_projection_select_result_ptr;
MergeTreeDataSelectAnalysisResultPtr merge_tree_normal_select_result_ptr;
};
/** Query along with some additional data,
@ -158,6 +163,7 @@ struct SelectQueryInfo
std::optional<ProjectionCandidate> projection;
bool ignore_projections = false;
bool is_projection_query = false;
MergeTreeDataSelectAnalysisResultPtr merge_tree_select_result_ptr;
};
}