Merge pull request #27742 from amosbird/projection-improvement3

Improve projection analysis.
This commit is contained in:
Nikolai Kochetov 2021-08-19 12:11:20 +03:00 committed by GitHub
commit 7dcff5c90e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 303 additions and 225 deletions

View File

@ -40,18 +40,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
struct ReadFromMergeTree::AnalysisResult
{
RangesInDataParts parts_with_ranges;
MergeTreeDataSelectSamplingData sampling;
IndexStats index_stats;
Names column_names_to_read;
ReadFromMergeTree::ReadType read_type = ReadFromMergeTree::ReadType::Default;
UInt64 selected_rows = 0;
UInt64 selected_marks = 0;
UInt64 selected_parts = 0;
};
static MergeTreeReaderSettings getMergeTreeReaderSettings(const ContextPtr & context)
{
const auto & settings = context->getSettingsRef();
@ -84,7 +72,8 @@ ReadFromMergeTree::ReadFromMergeTree(
size_t num_streams_,
bool sample_factor_column_queried_,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read_,
Poco::Logger * log_)
Poco::Logger * log_,
MergeTreeDataSelectAnalysisResultPtr analyzed_result_ptr_)
: ISourceStep(DataStream{.header = MergeTreeBaseSelectProcessor::transformHeader(
metadata_snapshot_->getSampleBlockForColumns(real_column_names_, data_.getVirtuals(), data_.getStorageID()),
getPrewhereInfo(query_info_),
@ -108,6 +97,7 @@ ReadFromMergeTree::ReadFromMergeTree(
, sample_factor_column_queried(sample_factor_column_queried_)
, max_block_numbers_to_read(std::move(max_block_numbers_to_read_))
, log(log_)
, analyzed_result_ptr(analyzed_result_ptr_)
{
if (sample_factor_column_queried)
{
@ -779,7 +769,34 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
return Pipe::unitePipes(std::move(partition_pipes));
}
ReadFromMergeTree::AnalysisResult ReadFromMergeTree::selectRangesToRead(MergeTreeData::DataPartsVector parts) const
MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(MergeTreeData::DataPartsVector parts) const
{
return selectRangesToRead(
std::move(parts),
metadata_snapshot_base,
metadata_snapshot,
query_info,
context,
requested_num_streams,
max_block_numbers_to_read,
data,
real_column_names,
sample_factor_column_queried,
log);
}
MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
MergeTreeData::DataPartsVector parts,
const StorageMetadataPtr & metadata_snapshot_base,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
ContextPtr context,
unsigned num_streams,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read,
const MergeTreeData & data,
const Names & real_column_names,
bool sample_factor_column_queried,
Poco::Logger * log)
{
AnalysisResult result;
const auto & settings = context->getSettingsRef();
@ -788,7 +805,7 @@ ReadFromMergeTree::AnalysisResult ReadFromMergeTree::selectRangesToRead(MergeTre
auto part_values = MergeTreeDataSelectExecutor::filterPartsByVirtualColumns(data, parts, query_info.query, context);
if (part_values && part_values->empty())
return result;
return std::make_shared<MergeTreeDataSelectAnalysisResult>(MergeTreeDataSelectAnalysisResult{.result = std::move(result)});
result.column_names_to_read = real_column_names;
@ -808,44 +825,66 @@ ReadFromMergeTree::AnalysisResult ReadFromMergeTree::selectRangesToRead(MergeTre
if (settings.force_primary_key && key_condition.alwaysUnknownOrTrue())
{
throw Exception(
ErrorCodes::INDEX_NOT_USED,
"Primary key ({}) is not used and setting 'force_primary_key' is set.",
fmt::join(primary_key_columns, ", "));
return std::make_shared<MergeTreeDataSelectAnalysisResult>(MergeTreeDataSelectAnalysisResult{
.result = std::make_exception_ptr(Exception(
ErrorCodes::INDEX_NOT_USED,
"Primary key ({}) is not used and setting 'force_primary_key' is set.",
fmt::join(primary_key_columns, ", ")))});
}
LOG_DEBUG(log, "Key condition: {}", key_condition.toString());
const auto & select = query_info.query->as<ASTSelectQuery &>();
MergeTreeDataSelectExecutor::filterPartsByPartition(
parts, part_values, metadata_snapshot_base, data, query_info, context,
max_block_numbers_to_read.get(), log, result.index_stats);
result.sampling = MergeTreeDataSelectExecutor::getSampling(
select, metadata_snapshot->getColumns().getAllPhysical(), parts, key_condition,
data, metadata_snapshot, context, sample_factor_column_queried, log);
if (result.sampling.read_nothing)
return result;
size_t total_marks_pk = 0;
for (const auto & part : parts)
total_marks_pk += part->index_granularity.getMarksCountWithoutFinal();
size_t parts_before_pk = 0;
try
{
MergeTreeDataSelectExecutor::filterPartsByPartition(
parts,
part_values,
metadata_snapshot_base,
data,
query_info,
context,
max_block_numbers_to_read.get(),
log,
result.index_stats);
size_t parts_before_pk = parts.size();
result.sampling = MergeTreeDataSelectExecutor::getSampling(
select,
metadata_snapshot->getColumns().getAllPhysical(),
parts,
key_condition,
data,
metadata_snapshot,
context,
sample_factor_column_queried,
log);
result.parts_with_ranges = MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipIndexes(
std::move(parts),
metadata_snapshot,
query_info,
context,
key_condition,
reader_settings,
log,
requested_num_streams,
result.index_stats,
true /* use_skip_indexes */,
true /* check_limits */);
if (result.sampling.read_nothing)
return std::make_shared<MergeTreeDataSelectAnalysisResult>(MergeTreeDataSelectAnalysisResult{.result = std::move(result)});
for (const auto & part : parts)
total_marks_pk += part->index_granularity.getMarksCountWithoutFinal();
parts_before_pk = parts.size();
auto reader_settings = getMergeTreeReaderSettings(context);
result.parts_with_ranges = MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipIndexes(
std::move(parts),
metadata_snapshot,
query_info,
context,
key_condition,
reader_settings,
log,
num_streams,
result.index_stats,
true /* use_skip_indexes */);
}
catch (...)
{
return std::make_shared<MergeTreeDataSelectAnalysisResult>(MergeTreeDataSelectAnalysisResult{.result = std::current_exception()});
}
size_t sum_marks_pk = total_marks_pk;
for (const auto & stat : result.index_stats)
@ -862,23 +901,15 @@ ReadFromMergeTree::AnalysisResult ReadFromMergeTree::selectRangesToRead(MergeTre
sum_marks += part.getMarksCount();
sum_rows += part.getRowsCount();
}
result.selected_parts = result.parts_with_ranges.size();
result.selected_marks = sum_marks;
result.selected_rows = sum_rows;
LOG_DEBUG(
log,
"Selected {}/{} parts by partition key, {} parts by primary key, {}/{} marks by primary key, {} marks to read from {} ranges",
parts_before_pk,
total_parts,
result.parts_with_ranges.size(),
sum_marks_pk,
total_marks_pk,
sum_marks,
sum_ranges);
ProfileEvents::increment(ProfileEvents::SelectedParts, result.parts_with_ranges.size());
ProfileEvents::increment(ProfileEvents::SelectedRanges, sum_ranges);
ProfileEvents::increment(ProfileEvents::SelectedMarks, sum_marks);
result.total_parts = total_parts;
result.parts_before_pk = parts_before_pk;
result.selected_parts = result.parts_with_ranges.size();
result.selected_ranges = sum_ranges;
result.selected_marks = sum_marks;
result.selected_marks_pk = sum_marks_pk;
result.total_marks_pk = total_marks_pk;
result.selected_rows = sum_rows;
const auto & input_order_info = query_info.input_order_info
? query_info.input_order_info
@ -888,12 +919,36 @@ ReadFromMergeTree::AnalysisResult ReadFromMergeTree::selectRangesToRead(MergeTre
result.read_type = (input_order_info->direction > 0) ? ReadType::InOrder
: ReadType::InReverseOrder;
return result;
return std::make_shared<MergeTreeDataSelectAnalysisResult>(MergeTreeDataSelectAnalysisResult{.result = std::move(result)});
}
ReadFromMergeTree::AnalysisResult ReadFromMergeTree::getAnalysisResult() const
{
auto result_ptr = analyzed_result_ptr ? analyzed_result_ptr : selectRangesToRead(prepared_parts);
if (std::holds_alternative<std::exception_ptr>(result_ptr->result))
std::rethrow_exception(std::move(std::get<std::exception_ptr>(result_ptr->result)));
return std::get<ReadFromMergeTree::AnalysisResult>(result_ptr->result);
}
void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &)
{
auto result = selectRangesToRead(prepared_parts);
auto result = getAnalysisResult();
LOG_DEBUG(
log,
"Selected {}/{} parts by partition key, {} parts by primary key, {}/{} marks by primary key, {} marks to read from {} ranges",
result.parts_before_pk,
result.total_parts,
result.selected_parts,
result.selected_marks_pk,
result.total_marks_pk,
result.selected_marks,
result.selected_ranges);
ProfileEvents::increment(ProfileEvents::SelectedParts, result.selected_parts);
ProfileEvents::increment(ProfileEvents::SelectedRanges, result.selected_ranges);
ProfileEvents::increment(ProfileEvents::SelectedMarks, result.selected_marks);
auto query_id_holder = MergeTreeDataSelectExecutor::checkLimits(data, result.parts_with_ranges, context);
if (result.parts_with_ranges.empty())
@ -1084,7 +1139,7 @@ static const char * readTypeToString(ReadFromMergeTree::ReadType type)
void ReadFromMergeTree::describeActions(FormatSettings & format_settings) const
{
auto result = selectRangesToRead(prepared_parts);
auto result = getAnalysisResult();
std::string prefix(format_settings.offset, format_settings.indent_char);
format_settings.out << prefix << "ReadType: " << readTypeToString(result.read_type) << '\n';
@ -1097,7 +1152,7 @@ void ReadFromMergeTree::describeActions(FormatSettings & format_settings) const
void ReadFromMergeTree::describeActions(JSONBuilder::JSONMap & map) const
{
auto result = selectRangesToRead(prepared_parts);
auto result = getAnalysisResult();
map.add("Read Type", readTypeToString(result.read_type));
if (!result.index_stats.empty())
{
@ -1108,7 +1163,7 @@ void ReadFromMergeTree::describeActions(JSONBuilder::JSONMap & map) const
void ReadFromMergeTree::describeIndexes(FormatSettings & format_settings) const
{
auto result = selectRangesToRead(prepared_parts);
auto result = getAnalysisResult();
auto index_stats = std::move(result.index_stats);
std::string prefix(format_settings.offset, format_settings.indent_char);
@ -1160,7 +1215,7 @@ void ReadFromMergeTree::describeIndexes(FormatSettings & format_settings) const
void ReadFromMergeTree::describeIndexes(JSONBuilder::JSONMap & map) const
{
auto result = selectRangesToRead(prepared_parts);
auto result = getAnalysisResult();
auto index_stats = std::move(result.index_stats);
if (!index_stats.empty())
@ -1215,4 +1270,20 @@ void ReadFromMergeTree::describeIndexes(JSONBuilder::JSONMap & map) const
}
}
bool MergeTreeDataSelectAnalysisResult::error() const
{
return std::holds_alternative<std::exception_ptr>(result);
}
size_t MergeTreeDataSelectAnalysisResult::marks() const
{
if (std::holds_alternative<std::exception_ptr>(result))
std::rethrow_exception(std::move(std::get<std::exception_ptr>(result)));
const auto & index_stats = std::get<ReadFromMergeTree::AnalysisResult>(result).index_stats;
if (index_stats.empty())
return 0;
return index_stats.back().num_granules_after;
}
}

View File

@ -9,6 +9,18 @@ using PartitionIdToMaxBlock = std::unordered_map<String, Int64>;
class Pipe;
struct MergeTreeDataSelectSamplingData
{
bool use_sampling = false;
bool read_nothing = false;
Float64 used_sample_factor = 1.0;
std::shared_ptr<ASTFunction> filter_function;
ActionsDAGPtr filter_expression;
};
struct MergeTreeDataSelectAnalysisResult;
using MergeTreeDataSelectAnalysisResultPtr = std::shared_ptr<MergeTreeDataSelectAnalysisResult>;
/// This step is created to read from MergeTree* table.
/// For now, it takes a list of parts and creates source from it.
class ReadFromMergeTree final : public ISourceStep
@ -54,6 +66,23 @@ public:
InReverseOrder,
};
struct AnalysisResult
{
RangesInDataParts parts_with_ranges;
MergeTreeDataSelectSamplingData sampling;
IndexStats index_stats;
Names column_names_to_read;
ReadFromMergeTree::ReadType read_type = ReadFromMergeTree::ReadType::Default;
UInt64 total_parts = 0;
UInt64 parts_before_pk = 0;
UInt64 selected_parts = 0;
UInt64 selected_ranges = 0;
UInt64 selected_marks = 0;
UInt64 selected_marks_pk = 0;
UInt64 total_marks_pk = 0;
UInt64 selected_rows = 0;
};
ReadFromMergeTree(
MergeTreeData::DataPartsVector parts_,
Names real_column_names_,
@ -67,7 +96,8 @@ public:
size_t num_streams_,
bool sample_factor_column_queried_,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read_,
Poco::Logger * log_
Poco::Logger * log_,
MergeTreeDataSelectAnalysisResultPtr analyzed_result_ptr_
);
String getName() const override { return "ReadFromMergeTree"; }
@ -84,6 +114,20 @@ public:
UInt64 getSelectedParts() const { return selected_parts; }
UInt64 getSelectedRows() const { return selected_rows; }
UInt64 getSelectedMarks() const { return selected_marks; }
static MergeTreeDataSelectAnalysisResultPtr selectRangesToRead(
MergeTreeData::DataPartsVector parts,
const StorageMetadataPtr & metadata_snapshot_base,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
ContextPtr context,
unsigned num_streams,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read,
const MergeTreeData & data,
const Names & real_column_names,
bool sample_factor_column_queried,
Poco::Logger * log);
private:
const MergeTreeReaderSettings reader_settings;
@ -137,8 +181,17 @@ private:
const Names & column_names,
ActionsDAGPtr & out_projection);
struct AnalysisResult;
AnalysisResult selectRangesToRead(MergeTreeData::DataPartsVector parts) const;
MergeTreeDataSelectAnalysisResultPtr selectRangesToRead(MergeTreeData::DataPartsVector parts) const;
ReadFromMergeTree::AnalysisResult getAnalysisResult() const;
MergeTreeDataSelectAnalysisResultPtr analyzed_result_ptr;
};
struct MergeTreeDataSelectAnalysisResult
{
std::variant<std::exception_ptr, ReadFromMergeTree::AnalysisResult> result;
bool error() const;
size_t marks() const;
};
}

View File

@ -51,6 +51,7 @@
#include <Common/escapeForFileName.h>
#include <Common/quoteString.h>
#include <Common/typeid_cast.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <boost/range/adaptor/filtered.hpp>
#include <boost/algorithm/string/join.hpp>
@ -3942,7 +3943,7 @@ static void selectBestProjection(
if (projection_parts.empty())
return;
auto sum_marks = reader.estimateNumMarksToRead(
auto projection_result_ptr = reader.estimateNumMarksToRead(
projection_parts,
candidate.required_columns,
metadata_snapshot,
@ -3952,6 +3953,10 @@ static void selectBestProjection(
settings.max_threads,
max_added_blocks);
if (projection_result_ptr->error())
return;
auto sum_marks = projection_result_ptr->marks();
if (normal_parts.empty())
{
// All parts are projection parts which allows us to use in_order_optimization.
@ -3960,7 +3965,7 @@ static void selectBestProjection(
}
else
{
sum_marks += reader.estimateNumMarksToRead(
auto normal_result_ptr = reader.estimateNumMarksToRead(
normal_parts,
required_columns,
metadata_snapshot,
@ -3969,7 +3974,14 @@ static void selectBestProjection(
query_context,
settings.max_threads,
max_added_blocks);
if (normal_result_ptr->error())
return;
sum_marks += normal_result_ptr->marks();
candidate.merge_tree_normal_select_result_ptr = normal_result_ptr;
}
candidate.merge_tree_projection_select_result_ptr = projection_result_ptr;
// We choose the projection with least sum_marks to read.
if (sum_marks < min_sum_marks)
@ -4190,10 +4202,25 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
auto parts = getDataPartsVector();
MergeTreeDataSelectExecutor reader(*this);
query_info.merge_tree_select_result_ptr = reader.estimateNumMarksToRead(
parts,
analysis_result.required_columns,
metadata_snapshot,
metadata_snapshot,
query_info,
query_context,
settings.max_threads,
max_added_blocks);
size_t min_sum_marks = std::numeric_limits<size_t>::max();
if (!query_info.merge_tree_select_result_ptr->error())
{
// Add 1 to base sum_marks so that we prefer projections even when they have equal number of marks to read.
// NOTE: It is not clear if we need it. E.g. projections do not support skip index for now.
min_sum_marks = query_info.merge_tree_select_result_ptr->marks() + 1;
}
ProjectionCandidate * selected_candidate = nullptr;
size_t min_sum_marks = std::numeric_limits<size_t>::max();
bool has_ordinary_projection = false;
/// Favor aggregate projections
for (auto & candidate : candidates)
{
@ -4212,44 +4239,25 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
selected_candidate,
min_sum_marks);
}
else
has_ordinary_projection = true;
}
/// Select the best normal projection if no aggregate projection is available
if (!selected_candidate && has_ordinary_projection)
/// Select the best normal projection.
for (auto & candidate : candidates)
{
min_sum_marks = reader.estimateNumMarksToRead(
parts,
analysis_result.required_columns,
metadata_snapshot,
metadata_snapshot,
query_info,
query_context,
settings.max_threads,
max_added_blocks);
// Add 1 to base sum_marks so that we prefer projections even when they have equal number of marks to read.
// NOTE: It is not clear if we need it. E.g. projections do not support skip index for now.
min_sum_marks += 1;
for (auto & candidate : candidates)
if (candidate.desc->type == ProjectionDescription::Type::Normal)
{
if (candidate.desc->type == ProjectionDescription::Type::Normal)
{
selectBestProjection(
reader,
metadata_snapshot,
query_info,
analysis_result.required_columns,
candidate,
query_context,
max_added_blocks,
settings,
parts,
selected_candidate,
min_sum_marks);
}
selectBestProjection(
reader,
metadata_snapshot,
query_info,
analysis_result.required_columns,
candidate,
query_context,
max_added_blocks,
settings,
parts,
selected_candidate,
min_sum_marks);
}
}
@ -4263,7 +4271,6 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
}
query_info.projection = std::move(*selected_candidate);
return true;
}
return false;

View File

@ -133,11 +133,10 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read) const
{
const auto & settings = context->getSettingsRef();
auto parts = data.getDataPartsVector();
if (!query_info.projection)
{
auto plan = readFromParts(
parts,
query_info.merge_tree_select_result_ptr ? MergeTreeData::DataPartsVector{} : data.getDataPartsVector(),
column_names_to_return,
metadata_snapshot,
metadata_snapshot,
@ -145,7 +144,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
context,
max_block_size,
num_streams,
max_block_numbers_to_read);
max_block_numbers_to_read,
query_info.merge_tree_select_result_ptr);
if (plan->isInitialized() && settings.allow_experimental_projection_optimization && settings.force_optimize_projection
&& !metadata_snapshot->projections.empty())
@ -162,27 +162,15 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
ProjectionDescription::typeToString(query_info.projection->desc->type),
query_info.projection->desc->name);
MergeTreeData::DataPartsVector projection_parts;
MergeTreeData::DataPartsVector normal_parts;
for (const auto & part : parts)
{
const auto & projections = part->getProjectionParts();
auto it = projections.find(query_info.projection->desc->name);
if (it != projections.end())
projection_parts.push_back(it->second);
else
normal_parts.push_back(part);
}
Pipes pipes;
Pipe projection_pipe;
Pipe ordinary_pipe;
if (!projection_parts.empty())
if (query_info.projection->merge_tree_projection_select_result_ptr)
{
LOG_DEBUG(log, "projection required columns: {}", fmt::join(query_info.projection->required_columns, ", "));
auto plan = readFromParts(
projection_parts,
{},
query_info.projection->required_columns,
metadata_snapshot,
query_info.projection->desc->metadata,
@ -190,7 +178,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
context,
max_block_size,
num_streams,
max_block_numbers_to_read);
max_block_numbers_to_read,
query_info.projection->merge_tree_projection_select_result_ptr);
if (plan)
{
@ -222,9 +211,10 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
}
}
if (!normal_parts.empty())
if (query_info.projection->merge_tree_normal_select_result_ptr)
{
auto storage_from_base_parts_of_projection = StorageFromMergeTreeDataPart::create(std::move(normal_parts));
auto storage_from_base_parts_of_projection
= StorageFromMergeTreeDataPart::create(data, query_info.projection->merge_tree_normal_select_result_ptr);
auto interpreter = InterpreterSelectQuery(
query_info.query,
context,
@ -763,8 +753,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
Poco::Logger * log,
size_t num_streams,
ReadFromMergeTree::IndexStats & index_stats,
bool use_skip_indexes,
bool check_limits)
bool use_skip_indexes)
{
RangesInDataParts parts_with_ranges(parts.size());
const Settings & settings = context->getSettingsRef();
@ -892,7 +881,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
if (!ranges.ranges.empty())
{
if (check_limits && (limits.max_rows || leaf_limits.max_rows))
if (limits.max_rows || leaf_limits.max_rows)
{
/// Fail fast if estimated number of rows to read exceeds the limit
auto current_rows_estimate = ranges.getRowsCount();
@ -1082,7 +1071,7 @@ static void selectColumnNames(
}
}
size_t MergeTreeDataSelectExecutor::estimateNumMarksToRead(
MergeTreeDataSelectAnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMarksToRead(
MergeTreeData::DataPartsVector parts,
const Names & column_names_to_return,
const StorageMetadataPtr & metadata_snapshot_base,
@ -1094,7 +1083,8 @@ size_t MergeTreeDataSelectExecutor::estimateNumMarksToRead(
{
size_t total_parts = parts.size();
if (total_parts == 0)
return 0;
return std::make_shared<MergeTreeDataSelectAnalysisResult>(
MergeTreeDataSelectAnalysisResult{.result = ReadFromMergeTree::AnalysisResult()});
Names real_column_names;
Names virt_column_names;
@ -1104,63 +1094,18 @@ size_t MergeTreeDataSelectExecutor::estimateNumMarksToRead(
selectColumnNames(column_names_to_return, data, real_column_names, virt_column_names, sample_factor_column_queried);
auto part_values = filterPartsByVirtualColumns(data, parts, query_info.query, context);
if (part_values && part_values->empty())
return 0;
/// If there are only virtual columns in the query, you must request at least one non-virtual one.
if (real_column_names.empty())
{
NamesAndTypesList available_real_columns = metadata_snapshot->getColumns().getAllPhysical();
real_column_names.push_back(ExpressionActions::getSmallestColumn(available_real_columns));
}
metadata_snapshot->check(real_column_names, data.getVirtuals(), data.getStorageID());
const auto & primary_key = metadata_snapshot->getPrimaryKey();
Names primary_key_columns = primary_key.column_names;
KeyCondition key_condition(query_info, context, primary_key_columns, primary_key.expression);
if (key_condition.alwaysUnknownOrTrue())
{
size_t total_marks = 0;
for (const auto & part : parts)
total_marks += part->index_granularity.getMarksCountWithoutFinal();
return total_marks;
}
const auto & select = query_info.query->as<ASTSelectQuery &>();
ReadFromMergeTree::IndexStats index_stats;
filterPartsByPartition(
parts, part_values, metadata_snapshot_base, data, query_info,
context, max_block_numbers_to_read.get(), log, index_stats);
auto sampling = MergeTreeDataSelectExecutor::getSampling(
select, metadata_snapshot->getColumns().getAllPhysical(), parts, key_condition,
data, metadata_snapshot, context, sample_factor_column_queried, log);
if (sampling.read_nothing)
return 0;
/// Do not init. It is not used (cause skip index is ignored)
MergeTreeReaderSettings reader_settings;
auto parts_with_ranges = filterPartsByPrimaryKeyAndSkipIndexes(
return ReadFromMergeTree::selectRangesToRead(
std::move(parts),
metadata_snapshot_base,
metadata_snapshot,
query_info,
context,
key_condition,
reader_settings,
log,
num_streams,
index_stats,
true /* use_skip_indexes */,
false /* check_limits */);
return index_stats.back().num_granules_after;
max_block_numbers_to_read,
data,
real_column_names,
sample_factor_column_queried,
log);
}
QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
@ -1172,10 +1117,16 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
ContextPtr context,
const UInt64 max_block_size,
const unsigned num_streams,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read) const
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read,
MergeTreeDataSelectAnalysisResultPtr merge_tree_select_result_ptr) const
{
size_t total_parts = parts.size();
if (total_parts == 0)
/// If merge_tree_select_result_ptr != nullptr, we use analyzed result so parts will always be empty.
if (merge_tree_select_result_ptr)
{
if (merge_tree_select_result_ptr->marks() == 0)
return std::make_unique<QueryPlan>();
}
else if (parts.empty())
return std::make_unique<QueryPlan>();
Names real_column_names;
@ -1187,7 +1138,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
selectColumnNames(column_names_to_return, data, real_column_names, virt_column_names, sample_factor_column_queried);
auto read_from_merge_tree = std::make_unique<ReadFromMergeTree>(
parts,
std::move(parts),
real_column_names,
virt_column_names,
data,
@ -1199,7 +1150,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
num_streams,
sample_factor_column_queried,
max_block_numbers_to_read,
log
log,
merge_tree_select_result_ptr
);
QueryPlanPtr plan = std::make_unique<QueryPlan>();

View File

@ -13,15 +13,6 @@ namespace DB
class KeyCondition;
struct MergeTreeDataSelectSamplingData
{
bool use_sampling = false;
bool read_nothing = false;
Float64 used_sample_factor = 1.0;
std::shared_ptr<ASTFunction> filter_function;
ActionsDAGPtr filter_expression;
};
using PartitionIdToMaxBlock = std::unordered_map<String, Int64>;
/** Executes SELECT queries on data from the merge tree.
@ -55,12 +46,13 @@ public:
ContextPtr context,
UInt64 max_block_size,
unsigned num_streams,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read = nullptr) const;
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read = nullptr,
MergeTreeDataSelectAnalysisResultPtr merge_tree_select_result_ptr = nullptr) const;
/// Get an estimation for the number of marks we are going to read.
/// Reads nothing. Secondary indexes are not used.
/// This method is used to select best projection for table.
size_t estimateNumMarksToRead(
MergeTreeDataSelectAnalysisResultPtr estimateNumMarksToRead(
MergeTreeData::DataPartsVector parts,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot_base,
@ -185,8 +177,7 @@ public:
Poco::Logger * log,
size_t num_streams,
ReadFromMergeTree::IndexStats & index_stats,
bool use_skip_indexes,
bool check_limits);
bool use_skip_indexes);
/// Create expression for sampling.
/// Also, calculate _sample_factor if needed.

View File

@ -31,8 +31,7 @@ public:
size_t max_block_size,
unsigned num_streams) override
{
// NOTE: It's used to read normal parts only
QueryPlan query_plan = std::move(*MergeTreeDataSelectExecutor(parts.front()->storage)
QueryPlan query_plan = std::move(*MergeTreeDataSelectExecutor(storage)
.readFromParts(
parts,
column_names,
@ -41,7 +40,9 @@ public:
query_info,
context,
max_block_size,
num_streams));
num_streams,
nullptr,
analysis_result_ptr));
return query_plan.convertToPipe(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
@ -54,54 +55,53 @@ public:
bool mayBenefitFromIndexForIn(
const ASTPtr & left_in_operand, ContextPtr query_context, const StorageMetadataPtr & metadata_snapshot) const override
{
return parts.front()->storage.mayBenefitFromIndexForIn(left_in_operand, query_context, metadata_snapshot);
return storage.mayBenefitFromIndexForIn(left_in_operand, query_context, metadata_snapshot);
}
NamesAndTypesList getVirtuals() const override
{
return parts.front()->storage.getVirtuals();
return storage.getVirtuals();
}
String getPartitionId() const
{
return parts.front()->info.partition_id;
return partition_id;
}
String getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr context) const
{
return parts.front()->storage.getPartitionIDFromQuery(ast, context);
return storage.getPartitionIDFromQuery(ast, context);
}
protected:
/// Used in part mutation.
StorageFromMergeTreeDataPart(const MergeTreeData::DataPartPtr & part_)
: IStorage(getIDFromPart(part_))
, parts({part_})
, storage(part_->storage)
, partition_id(part_->info.partition_id)
{
setInMemoryMetadata(part_->storage.getInMemoryMetadata());
setInMemoryMetadata(storage.getInMemoryMetadata());
}
StorageFromMergeTreeDataPart(MergeTreeData::DataPartsVector && parts_)
: IStorage(getIDFromParts(parts_))
, parts(std::move(parts_))
/// Used in queries with projection.
StorageFromMergeTreeDataPart(const MergeTreeData & storage_, MergeTreeDataSelectAnalysisResultPtr analysis_result_ptr_)
: IStorage(storage_.getStorageID()), storage(storage_), analysis_result_ptr(analysis_result_ptr_)
{
setInMemoryMetadata(parts.front()->storage.getInMemoryMetadata());
setInMemoryMetadata(storage.getInMemoryMetadata());
}
private:
MergeTreeData::DataPartsVector parts;
const MergeTreeData & storage;
String partition_id;
MergeTreeDataSelectAnalysisResultPtr analysis_result_ptr;
static StorageID getIDFromPart(const MergeTreeData::DataPartPtr & part_)
{
auto table_id = part_->storage.getStorageID();
return StorageID(table_id.database_name, table_id.table_name + " (part " + part_->name + ")");
}
static StorageID getIDFromParts(const MergeTreeData::DataPartsVector & parts_)
{
assert(!parts_.empty());
auto table_id = parts_.front()->storage.getStorageID();
return StorageID(table_id.database_name, table_id.table_name + " (parts)");
}
};
}

View File

@ -6,8 +6,6 @@
#include <Interpreters/replaceAliasColumnsInQuery.h>
#include <Functions/IFunction.h>
#include <Interpreters/TableJoin.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/StorageFromMergeTreeDataPart.h>
namespace DB
{

View File

@ -39,6 +39,9 @@ using ReadInOrderOptimizerPtr = std::shared_ptr<const ReadInOrderOptimizer>;
class Cluster;
using ClusterPtr = std::shared_ptr<Cluster>;
struct MergeTreeDataSelectAnalysisResult;
using MergeTreeDataSelectAnalysisResultPtr = std::shared_ptr<MergeTreeDataSelectAnalysisResult>;
struct PrewhereInfo
{
/// Actions which are executed in order to alias columns are used for prewhere actions.
@ -118,6 +121,8 @@ struct ProjectionCandidate
ReadInOrderOptimizerPtr order_optimizer;
InputOrderInfoPtr input_order_info;
ManyExpressionActions group_by_elements_actions;
MergeTreeDataSelectAnalysisResultPtr merge_tree_projection_select_result_ptr;
MergeTreeDataSelectAnalysisResultPtr merge_tree_normal_select_result_ptr;
};
/** Query along with some additional data,
@ -158,6 +163,7 @@ struct SelectQueryInfo
std::optional<ProjectionCandidate> projection;
bool ignore_projections = false;
bool is_projection_query = false;
MergeTreeDataSelectAnalysisResultPtr merge_tree_select_result_ptr;
};
}