Add more comments.

This commit is contained in:
Nikolai Kochetov 2021-05-28 20:16:09 +03:00
parent 295a302bc8
commit 58fbc544cc
4 changed files with 66 additions and 47 deletions

View File

@ -17,6 +17,7 @@
#include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Storages/VirtualColumnUtils.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
@ -43,7 +44,6 @@ struct ReadFromMergeTree::AnalysisResult
{
RangesInDataParts parts_with_ranges;
MergeTreeDataSelectSamplingData sampling;
String query_id;
IndexStats index_stats;
Names column_names_to_read;
ReadFromMergeTree::ReadType read_type = ReadFromMergeTree::ReadType::Default;
@ -830,14 +830,14 @@ ReadFromMergeTree::AnalysisResult ReadFromMergeTree::selectRangesToRead(MergeTre
LOG_DEBUG(log, "Key condition: {}", key_condition.toString());
const auto & select = query_info.query->as<ASTSelectQuery &>();
auto query_context = context->hasQueryContext() ? context->getQueryContext() : context;
MergeTreeDataSelectExecutor::filterPartsByPartition(
metadata_snapshot_base, data, query_info, context, query_context, parts, part_values, max_block_numbers_to_read.get(), log, result.index_stats);
parts, part_values, metadata_snapshot_base, data, query_info, context,
max_block_numbers_to_read.get(), log, result.index_stats);
result.sampling = MergeTreeDataSelectExecutor::getSampling(
select, parts, metadata_snapshot, key_condition,
data, log, sample_factor_column_queried, metadata_snapshot->getColumns().getAllPhysical(), context);
select, metadata_snapshot->getColumns().getAllPhysical(), parts, key_condition,
data, metadata_snapshot, context, sample_factor_column_queried, log);
if (result.sampling.read_nothing)
return result;
@ -885,8 +885,6 @@ ReadFromMergeTree::AnalysisResult ReadFromMergeTree::selectRangesToRead(MergeTre
sum_marks,
sum_ranges);
result.query_id = MergeTreeDataSelectExecutor::checkLimits(data, result.parts_with_ranges, context);
ProfileEvents::increment(ProfileEvents::SelectedParts, result.parts_with_ranges.size());
ProfileEvents::increment(ProfileEvents::SelectedRanges, sum_ranges);
ProfileEvents::increment(ProfileEvents::SelectedMarks, sum_marks);
@ -905,6 +903,8 @@ ReadFromMergeTree::AnalysisResult ReadFromMergeTree::selectRangesToRead(MergeTre
void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &)
{
auto result = selectRangesToRead(prepared_parts);
auto query_id_holder = MergeTreeDataSelectExecutor::checkLimits(data, result.parts_with_ranges, context);
if (result.parts_with_ranges.empty())
{
pipeline.init(Pipe(std::make_shared<NullSource>(getOutputStream().header)));
@ -1048,8 +1048,8 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build
processors.emplace_back(processor);
// Attach QueryIdHolder if needed
if (!result.query_id.empty())
pipe.addQueryIdHolder(std::make_shared<QueryIdHolder>(result.query_id, data));
if (query_id_holder)
pipe.addQueryIdHolder(std::move(query_id_holder));
pipeline.init(std::move(pipe));
}

View File

@ -1,15 +1,14 @@
#pragma once
#include <Processors/QueryPlan/ISourceStep.h>
#include <Processors/Pipe.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
//#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
namespace DB
{
using PartitionIdToMaxBlock = std::unordered_map<String, Int64>;
class Pipe;
/// This step is created to read from MergeTree* table.
/// For now, it takes a list of parts and creates source from it.
class ReadFromMergeTree final : public ISourceStep

View File

@ -381,14 +381,14 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
MergeTreeDataSelectSamplingData MergeTreeDataSelectExecutor::getSampling(
const ASTSelectQuery & select,
NamesAndTypesList available_real_columns,
const MergeTreeData::DataPartsVector & parts,
const StorageMetadataPtr & metadata_snapshot,
KeyCondition & key_condition,
const MergeTreeData & data,
Poco::Logger * log,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
bool sample_factor_column_queried,
NamesAndTypesList available_real_columns,
ContextPtr context)
Poco::Logger * log)
{
const Settings & settings = context->getSettingsRef();
/// Sampling.
@ -643,7 +643,7 @@ MergeTreeDataSelectSamplingData MergeTreeDataSelectExecutor::getSampling(
std::optional<std::unordered_set<String>> MergeTreeDataSelectExecutor::filterPartsByVirtualColumns(
const MergeTreeData & data,
MergeTreeData::DataPartsVector & parts,
const MergeTreeData::DataPartsVector & parts,
const ASTPtr & query,
ContextPtr context)
{
@ -666,13 +666,12 @@ std::optional<std::unordered_set<String>> MergeTreeDataSelectExecutor::filterPar
}
void MergeTreeDataSelectExecutor::filterPartsByPartition(
MergeTreeData::DataPartsVector & parts,
const std::optional<std::unordered_set<String>> & part_values,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeData & data,
const SelectQueryInfo & query_info,
const ContextPtr & context,
const ContextPtr & query_context,
MergeTreeData::DataPartsVector & parts,
const std::optional<std::unordered_set<String>> & part_values,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
Poco::Logger * log,
ReadFromMergeTree::IndexStats & index_stats)
@ -709,6 +708,7 @@ void MergeTreeDataSelectExecutor::filterPartsByPartition(
}
}
auto query_context = context->hasQueryContext() ? context->getQueryContext() : context;
PartFilterCounters part_filter_counters;
if (query_context->getSettingsRef().allow_experimental_query_deduplication)
selectPartsToReadWithUUIDFilter(
@ -766,7 +766,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
StorageMetadataPtr metadata_snapshot,
const SelectQueryInfo & query_info,
const ContextPtr & context,
KeyCondition & key_condition,
const KeyCondition & key_condition,
const MergeTreeReaderSettings & reader_settings,
Poco::Logger * log,
size_t num_streams,
@ -992,7 +992,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
return parts_with_ranges;
}
String MergeTreeDataSelectExecutor::checkLimits(
std::shared_ptr<QueryIdHolder> MergeTreeDataSelectExecutor::checkLimits(
const MergeTreeData & data,
const RangesInDataParts & parts_with_ranges,
const ContextPtr & context)
@ -1032,7 +1032,10 @@ String MergeTreeDataSelectExecutor::checkLimits(
}
}
return query_id;
if (!query_id.empty())
return std::make_shared<QueryIdHolder>(query_id, data);
return nullptr;
}
static void selectColumnNames(
@ -1135,15 +1138,15 @@ size_t MergeTreeDataSelectExecutor::estimateNumMarksToRead(
}
const auto & select = query_info.query->as<ASTSelectQuery &>();
auto query_context = context->hasQueryContext() ? context->getQueryContext() : context;
ReadFromMergeTree::IndexStats index_stats;
filterPartsByPartition(
metadata_snapshot_base, data, query_info, context, query_context, parts, part_values, max_block_numbers_to_read.get(), log, index_stats);
parts, part_values, metadata_snapshot_base, data, query_info,
context, max_block_numbers_to_read.get(), log, index_stats);
auto sampling = MergeTreeDataSelectExecutor::getSampling(
select, parts, metadata_snapshot, key_condition,
data, log, sample_factor_column_queried, metadata_snapshot->getColumns().getAllPhysical(), context);
select, metadata_snapshot->getColumns().getAllPhysical(), parts, key_condition,
data, metadata_snapshot, context, sample_factor_column_queried, log);
if (sampling.read_nothing)
return 0;

View File

@ -45,16 +45,7 @@ public:
QueryProcessingStage::Enum processed_stage,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read = nullptr) const;
size_t estimateNumMarksToRead(
MergeTreeData::DataPartsVector parts,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot_base,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
ContextPtr context,
unsigned num_streams,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read = nullptr) const;
/// The same as read, but with specified set of parts.
QueryPlanPtr readFromParts(
MergeTreeData::DataPartsVector parts,
const Names & column_names,
@ -66,6 +57,19 @@ public:
unsigned num_streams,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read = nullptr) const;
/// Get an estimation for the number of marks we are going to read.
/// Reads nothing. Secondary indexes are not used.
/// This method is used to select best projection for table.
size_t estimateNumMarksToRead(
MergeTreeData::DataPartsVector parts,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot_base,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
ContextPtr context,
unsigned num_streams,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read = nullptr) const;
private:
const MergeTreeData & data;
Poco::Logger * log;
@ -131,12 +135,15 @@ private:
Poco::Logger * log);
public:
/// For given number rows and bytes, get the number of marks to read.
/// It is a minimal number of marks which contain so many rows and bytes.
static size_t roundRowsOrBytesToMarks(
size_t rows_setting,
size_t bytes_setting,
size_t rows_granularity,
size_t bytes_granularity);
/// The same as roundRowsOrBytesToMarks, but return no more than max_marks.
static size_t minMarksForConcurrentRead(
size_t rows_setting,
size_t bytes_setting,
@ -144,48 +151,58 @@ public:
size_t bytes_granularity,
size_t max_marks);
/// If possible, filter using expression on virtual columns.
/// Example: SELECT count() FROM table WHERE _part = 'part_name'
/// If expression found, return a set with allowed part names (std::nullopt otherwise).
static std::optional<std::unordered_set<String>> filterPartsByVirtualColumns(
const MergeTreeData & data,
MergeTreeData::DataPartsVector & parts,
const MergeTreeData::DataPartsVector & parts,
const ASTPtr & query,
ContextPtr context);
/// Filter parts using minmax index and partition key.
static void filterPartsByPartition(
MergeTreeData::DataPartsVector & parts,
const std::optional<std::unordered_set<String>> & part_values,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeData & data,
const SelectQueryInfo & query_info,
const ContextPtr & context,
const ContextPtr & query_context,
MergeTreeData::DataPartsVector & parts,
const std::optional<std::unordered_set<String>> & part_values,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
Poco::Logger * log,
ReadFromMergeTree::IndexStats & index_stats);
/// Filter parts using primary key and secondary indexes.
/// For every part, select mark ranges to read.
static RangesInDataParts filterPartsByPrimaryKeyAndSkipIndexes(
MergeTreeData::DataPartsVector && parts,
StorageMetadataPtr metadata_snapshot,
const SelectQueryInfo & query_info,
const ContextPtr & context,
KeyCondition & key_condition,
const KeyCondition & key_condition,
const MergeTreeReaderSettings & reader_settings,
Poco::Logger * log,
size_t num_streams,
ReadFromMergeTree::IndexStats & index_stats,
bool use_skip_indexes);
/// Create expression for sampling.
/// Also, calculate _sample_factor if needed.
/// Also, update key condition with selected sampling range.
static MergeTreeDataSelectSamplingData getSampling(
const ASTSelectQuery & select,
NamesAndTypesList available_real_columns,
const MergeTreeData::DataPartsVector & parts,
const StorageMetadataPtr & metadata_snapshot,
KeyCondition & key_condition,
const MergeTreeData & data,
Poco::Logger * log,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
bool sample_factor_column_queried,
NamesAndTypesList available_real_columns,
ContextPtr context);
Poco::Logger * log);
static String checkLimits(
/// Check query limits: max_partitions_to_read, max_concurrent_queries.
/// Also, return QueryIdHolder. If not null, we should keep it until query finishes.
static std::shared_ptr<QueryIdHolder> checkLimits(
const MergeTreeData & data,
const RangesInDataParts & parts_with_ranges,
const ContextPtr & context);