This commit is contained in:
Nikolai Kochetov 2021-05-27 16:40:33 +03:00
parent a51a6ea0b7
commit cbdf3752ef
7 changed files with 1116 additions and 1746 deletions

View File

@ -3,6 +3,9 @@
#include <Processors/ConcatProcessor.h>
#include <Processors/Transforms/ReverseTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/AddingSelectorTransform.h>
#include <Processors/Transforms/CopyTransform.h>
#include <Processors/Merges/MergingSortedTransform.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Merges/AggregatingSortedTransform.h>
@ -11,8 +14,6 @@
#include <Processors/Merges/ReplacingSortedTransform.h>
#include <Processors/Merges/SummingSortedTransform.h>
#include <Processors/Merges/VersionedCollapsingTransform.h>
#include <Processors/Transforms/AddingSelectorTransform.h>
#include <Processors/Transforms/CopyTransform.h>
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h>
@ -88,38 +89,35 @@ size_t minMarksForConcurrentRead(
}
ReadFromMergeTree::ReadFromMergeTree(
SelectQueryInfo query_info_,
const MergeTreeDataSelectExecutor::PartitionIdToMaxBlock * max_block_numbers_to_read_,
const SelectQueryInfo & query_info_,
const PartitionIdToMaxBlock * max_block_numbers_to_read_,
ContextPtr context_,
const MergeTreeData & data_,
const MergeTreeData & storage_,
StorageMetadataPtr metadata_snapshot_,
StorageMetadataPtr metadata_snapshot_base_,
Names real_column_names_,
MergeTreeData::DataPartsVector parts_,
//IndexStatPtr index_stats_,
PrewhereInfoPtr prewhere_info_,
Names virt_column_names_,
Settings settings_,
size_t num_streams_)
Poco::Logger * log_)
: ISourceStep(DataStream{.header = MergeTreeBaseSelectProcessor::transformHeader(
metadata_snapshot_->getSampleBlockForColumns(real_column_names_, storage_.getVirtuals(), storage_.getStorageID()),
metadata_snapshot_->getSampleBlockForColumns(real_column_names_, data_.getVirtuals(), data_.getStorageID()),
prewhere_info_,
storage_.getPartitionValueType(),
data_.getPartitionValueType(),
virt_column_names_)})
, query_info(std::move(query_info_))
, max_block_numbers_to_read(max_block_numbers_to_read_)
, context(std::move(context_))
, data(data_)
, storage(storage_)
, metadata_snapshot(std::move(metadata_snapshot_))
, metadata_snapshot_base(std::move(metadata_snapshot_base_))
, real_column_names(std::move(real_column_names_))
, parts(std::move(parts_))
, prepared_parts(std::move(parts_))
, prewhere_info(std::move(prewhere_info_))
, virt_column_names(std::move(virt_column_names_))
, settings(std::move(settings_))
, num_streams(num_streams_)
, log(log_)
{
}
@ -142,7 +140,7 @@ Pipe ReadFromMergeTree::readFromPool(
sum_marks,
min_marks_for_concurrent_read,
std::move(parts_with_range),
storage,
data,
metadata_snapshot,
prewhere_info,
true,
@ -151,7 +149,7 @@ Pipe ReadFromMergeTree::readFromPool(
settings.preferred_block_size_bytes,
false);
auto * logger = &Poco::Logger::get(storage.getLogName() + " (SelectExecutor)");
auto * logger = &Poco::Logger::get(data.getLogName() + " (SelectExecutor)");
LOG_DEBUG(logger, "Reading approx. {} rows with {} streams", total_rows, used_max_streams);
for (size_t i = 0; i < used_max_streams; ++i)
@ -159,7 +157,7 @@ Pipe ReadFromMergeTree::readFromPool(
auto source = std::make_shared<MergeTreeThreadSelectBlockInputProcessor>(
i, pool, min_marks_for_concurrent_read, settings.max_block_size,
settings.preferred_block_size_bytes, settings.preferred_max_column_in_block_size_bytes,
storage, metadata_snapshot, use_uncompressed_cache,
data, metadata_snapshot, use_uncompressed_cache,
prewhere_info, settings.reader_settings, virt_column_names);
if (i == 0)
@ -178,7 +176,7 @@ template<typename TSource>
ProcessorPtr ReadFromMergeTree::createSource(const RangesInDataPart & part, const Names & required_columns, bool use_uncompressed_cache)
{
return std::make_shared<TSource>(
storage, metadata_snapshot, part.data_part, settings.max_block_size, settings.preferred_block_size_bytes,
data, metadata_snapshot, part.data_part, settings.max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, required_columns, part.ranges, use_uncompressed_cache,
prewhere_info, true, settings.reader_settings, virt_column_names, part.part_index_in_query);
}
@ -212,7 +210,7 @@ Pipe ReadFromMergeTree::read(
RangesInDataParts parts_with_range, Names required_columns, ReadType read_type,
size_t used_max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache)
{
if (read_type == ReadType::Default && num_streams > 1)
if (read_type == ReadType::Default && used_max_streams > 1)
return readFromPool(parts_with_range, required_columns, used_max_streams, min_marks_for_concurrent_read, use_uncompressed_cache);
auto pipe = readInOrder(parts_with_range, required_columns, read_type, use_uncompressed_cache);
@ -225,126 +223,6 @@ Pipe ReadFromMergeTree::read(
return pipe;
}
static std::optional<std::unordered_set<String>> filterPartsByVirtualColumns(
const MergeTreeData & data,
MergeTreeData::DataPartsVector & parts,
ASTPtr & query,
ContextPtr context)
{
std::unordered_set<String> part_values;
ASTPtr expression_ast;
auto virtual_columns_block = data.getBlockWithVirtualPartColumns(parts, true /* one_part */);
// Generate valid expressions for filtering
VirtualColumnUtils::prepareFilterBlockWithQuery(query, context, virtual_columns_block, expression_ast);
// If there is still something left, fill the virtual block and do the filtering.
if (expression_ast)
{
virtual_columns_block = data.getBlockWithVirtualPartColumns(parts, false /* one_part */);
VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, context, expression_ast);
return VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
}
return {};
}
static void filterPartsByPartition(
StorageMetadataPtr & metadata_snapshot,
const MergeTreeData & data,
SelectQueryInfo & query_info,
ContextPtr & context,
ContextPtr & query_context,
MergeTreeData::DataPartsVector & parts,
const std::optional<std::unordered_set<String>> & part_values,
const MergeTreeDataSelectExecutor::PartitionIdToMaxBlock * max_block_numbers_to_read,
Poco::Logger * log,
ReadFromMergeTree::IndexStats & index_stats)
{
const Settings & settings = context->getSettingsRef();
std::optional<PartitionPruner> partition_pruner;
std::optional<KeyCondition> minmax_idx_condition;
DataTypes minmax_columns_types;
if (metadata_snapshot->hasPartitionKey())
{
const auto & partition_key = metadata_snapshot->getPartitionKey();
auto minmax_columns_names = data.getMinMaxColumnsNames(partition_key);
minmax_columns_types = data.getMinMaxColumnsTypes(partition_key);
minmax_idx_condition.emplace(
query_info, context, minmax_columns_names, data.getMinMaxExpr(partition_key, ExpressionActionsSettings::fromContext(context)));
partition_pruner.emplace(metadata_snapshot->getPartitionKey(), query_info, context, false /* strict */);
if (settings.force_index_by_date && (minmax_idx_condition->alwaysUnknownOrTrue() && partition_pruner->isUseless()))
{
String msg = "Neither MinMax index by columns (";
bool first = true;
for (const String & col : minmax_columns_names)
{
if (first)
first = false;
else
msg += ", ";
msg += col;
}
msg += ") nor partition expr is used and setting 'force_index_by_date' is set";
throw Exception(msg, ErrorCodes::INDEX_NOT_USED);
}
}
MergeTreeDataSelectExecutor::PartFilterCounters part_filter_counters;
if (query_context->getSettingsRef().allow_experimental_query_deduplication)
MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter(
parts,
part_values,
data.getPinnedPartUUIDs(),
minmax_idx_condition,
minmax_columns_types,
partition_pruner,
max_block_numbers_to_read,
query_context,
part_filter_counters,
log);
else
MergeTreeDataSelectExecutor::selectPartsToRead(
parts,
part_values,
minmax_idx_condition,
minmax_columns_types,
partition_pruner,
max_block_numbers_to_read,
part_filter_counters);
index_stats.emplace_back(ReadFromMergeTree::IndexStat{
.type = ReadFromMergeTree::IndexType::None,
.num_parts_after = part_filter_counters.num_initial_selected_parts,
.num_granules_after = part_filter_counters.num_initial_selected_granules});
if (minmax_idx_condition)
{
auto description = minmax_idx_condition->getDescription();
index_stats.emplace_back(ReadFromMergeTree::IndexStat{
.type = ReadFromMergeTree::IndexType::MinMax,
.condition = std::move(description.condition),
.used_keys = std::move(description.used_keys),
.num_parts_after = part_filter_counters.num_parts_after_minmax,
.num_granules_after = part_filter_counters.num_granules_after_minmax});
LOG_DEBUG(log, "MinMax index condition: {}", minmax_idx_condition->toString());
}
if (partition_pruner)
{
auto description = partition_pruner->getKeyCondition().getDescription();
index_stats.emplace_back(ReadFromMergeTree::IndexStat{
.type = ReadFromMergeTree::IndexType::Partition,
.condition = std::move(description.condition),
.used_keys = std::move(description.used_keys),
.num_parts_after = part_filter_counters.num_parts_after_partition_pruner,
.num_granules_after = part_filter_counters.num_granules_after_partition_pruner});
}
}
Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(
RangesInDataParts && parts_with_ranges,
const Names & column_names)
@ -392,7 +270,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(
if (0 == sum_marks)
return {};
size_t used_num_streams = num_streams;
size_t used_num_streams = settings.num_streams;
if (used_num_streams > 1)
{
/// Reduce the number of num_streams if the data is small.
@ -506,12 +384,12 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
return new_ranges;
};
const size_t min_marks_per_stream = (sum_marks - 1) / num_streams + 1;
const size_t min_marks_per_stream = (sum_marks - 1) / settings.num_streams + 1;
bool need_preliminary_merge = (parts_with_ranges.size() > q_settings.read_in_order_two_level_merge_threshold);
Pipes pipes;
for (size_t i = 0; i < num_streams && !parts_with_ranges.empty(); ++i)
for (size_t i = 0; i < settings.num_streams && !parts_with_ranges.empty(); ++i)
{
size_t need_marks = min_marks_per_stream;
RangesInDataParts new_parts;
@ -577,7 +455,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
: ReadFromMergeTree::ReadType::InReverseOrder;
pipes.emplace_back(read(std::move(new_parts), column_names, read_type,
num_streams, min_marks_for_concurrent_read, use_uncompressed_cache));
settings.num_streams, min_marks_for_concurrent_read, use_uncompressed_cache));
}
if (need_preliminary_merge)
@ -755,7 +633,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
if (sum_marks > max_marks_to_use_cache)
use_uncompressed_cache = false;
size_t used_num_streams = num_streams;
size_t used_num_streams = settings.num_streams;
if (used_num_streams > q_settings.max_final_threads)
used_num_streams = q_settings.max_final_threads;
@ -966,9 +844,10 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &)
{
auto parts = std::move(prepared_parts);
size_t total_parts = parts.size();
auto part_values = filterPartsByVirtualColumns(data, parts, query_info.query, context);
auto part_values = MergeTreeDataSelectExecutor::filterPartsByVirtualColumns(data, parts, query_info.query, context);
if (part_values && part_values->empty())
{
pipeline.init(Pipe(std::make_shared<NullSource>(getOutputStream().header)));
@ -1001,7 +880,7 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build
const auto & select = query_info.query->as<ASTSelectQuery &>();
auto query_context = context->hasQueryContext() ? context->getQueryContext() : context;
filterPartsByPartition(
MergeTreeDataSelectExecutor::filterPartsByPartition(
metadata_snapshot_base, data, query_info, context, query_context, parts, part_values, max_block_numbers_to_read, log, index_stats);
bool sample_factor_column_queried = false;
@ -1023,16 +902,17 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build
for (const auto & part : parts)
total_marks_pk += part->index_granularity.getMarksCountWithoutFinal();
auto parts_with_ranges = MergeTreeDataSelectExecutor::filterParts(
parts,
auto parts_with_ranges = MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipIndexes(
std::move(parts),
metadata_snapshot,
query_info,
context,
key_condition,
settings.reader_settings,
log,
num_streams,
index_stats);
settings.num_streams,
index_stats,
true);
size_t sum_marks_pk = total_marks_pk;
for (const auto & stat : index_stats)
@ -1132,12 +1012,69 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build
column_names_to_read);
}
if (pipe.empty())
{
pipeline.init(Pipe(std::make_shared<NullSource>(getOutputStream().header)));
return;
}
if (sampling.use_sampling)
{
auto sampling_actions = std::make_shared<ExpressionActions>(sampling.filter_expression);
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FilterTransform>(
header,
sampling_actions,
sampling.filter_function->getColumnName(),
false);
});
}
if (result_projection)
{
auto projection_actions = std::make_shared<ExpressionActions>(result_projection);
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, projection_actions);
});
}
/// By the way, if a distributed query or query to a Merge table is made, then the `_sample_factor` column can have different values.
if (sample_factor_column_queried)
{
ColumnWithTypeAndName column;
column.name = "_sample_factor";
column.type = std::make_shared<DataTypeFloat64>();
column.column = column.type->createColumnConst(0, Field(sampling.used_sample_factor));
auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column));
auto adding_column_action = std::make_shared<ExpressionActions>(adding_column_dag);
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, adding_column_action);
});
}
// TODO There seems to be no place initializing remove_columns_actions
if (query_info.prewhere_info && query_info.prewhere_info->remove_columns_actions)
{
auto remove_columns_action = std::make_shared<ExpressionActions>(
query_info.prewhere_info->remove_columns_actions->getActionsDAG().clone());
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, remove_columns_action);
});
}
for (const auto & processor : pipe.getProcessors())
processors.emplace_back(processor);
// Attach QueryIdHolder if needed
if (!query_id.empty())
pipe.addQueryIdHolder(std::make_shared<QueryIdHolder>(query_id, storage));
pipe.addQueryIdHolder(std::make_shared<QueryIdHolder>(query_id, data));
pipeline.init(std::move(pipe));
}
@ -1161,20 +1098,20 @@ static const char * indexTypeToString(ReadFromMergeTree::IndexType type)
__builtin_unreachable();
}
static const char * readTypeToString(ReadFromMergeTree::ReadType type)
{
switch (type)
{
case ReadFromMergeTree::ReadType::Default:
return "Default";
case ReadFromMergeTree::ReadType::InOrder:
return "InOrder";
case ReadFromMergeTree::ReadType::InReverseOrder:
return "InReverseOrder";
}
// static const char * readTypeToString(ReadFromMergeTree::ReadType type)
// {
// switch (type)
// {
// case ReadFromMergeTree::ReadType::Default:
// return "Default";
// case ReadFromMergeTree::ReadType::InOrder:
// return "InOrder";
// case ReadFromMergeTree::ReadType::InReverseOrder:
// return "InReverseOrder";
// }
__builtin_unreachable();
}
// __builtin_unreachable();
// }
void ReadFromMergeTree::describeActions(FormatSettings & format_settings) const
{

View File

@ -3,11 +3,13 @@
#include <Processors/Pipe.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
//#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
namespace DB
{
using PartitionIdToMaxBlock = std::unordered_map<String, Int64>;
/// This step is created to read from MergeTree* table.
/// For now, it takes a list of parts and creates source from it.
class ReadFromMergeTree final : public ISourceStep
@ -42,9 +44,10 @@ public:
struct Settings
{
UInt64 max_block_size;
size_t num_streams;
size_t preferred_block_size_bytes;
size_t preferred_max_column_in_block_size_bytes;
size_t min_marks_for_concurrent_read;
//size_t min_marks_for_concurrent_read;
bool use_uncompressed_cache;
bool force_primary_key;
@ -68,21 +71,18 @@ public:
};
ReadFromMergeTree(
SelectQueryInfo query_info_,
const MergeTreeDataSelectExecutor::PartitionIdToMaxBlock * max_block_numbers_to_read_,
const SelectQueryInfo & query_info_,
const PartitionIdToMaxBlock * max_block_numbers_to_read_,
ContextPtr context_,
const MergeTreeData & data_,
const MergeTreeData & storage_,
StorageMetadataPtr metadata_snapshot_,
StorageMetadataPtr metadata_snapshot_base_,
Names real_column_names_,
MergeTreeData::DataPartsVector parts_,
//IndexStatPtr index_stats_,
PrewhereInfoPtr prewhere_info_,
Names virt_column_names_,
Settings settings_,
size_t num_streams_,
//ReadType read_type_
Poco::Logger * log_
);
String getName() const override { return "ReadFromMergeTree"; }
@ -97,23 +97,19 @@ public:
private:
SelectQueryInfo query_info;
const MergeTreeDataSelectExecutor::PartitionIdToMaxBlock * max_block_numbers_to_read;
const PartitionIdToMaxBlock * max_block_numbers_to_read;
ContextPtr context;
const MergeTreeData & data;
const MergeTreeData & storage;
StorageMetadataPtr metadata_snapshot;
StorageMetadataPtr metadata_snapshot_base;
Names real_column_names;
MergeTreeData::DataPartsVector parts;
IndexStats index_stats;
MergeTreeData::DataPartsVector prepared_parts;
PrewhereInfoPtr prewhere_info;
IndexStats index_stats;
Names virt_column_names;
Settings settings;
size_t num_streams;
//ReadType read_type;
Poco::Logger * log;
Pipe read(RangesInDataParts parts_with_range, Names required_columns, ReadType read_type, size_t used_max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache);

View File

@ -3839,21 +3839,18 @@ static void selectBestProjection(
if (projection_parts.empty())
return;
candidate.merge_tree_data_select_base_cache = std::make_unique<MergeTreeDataSelectCache>();
candidate.merge_tree_data_select_projection_cache = std::make_unique<MergeTreeDataSelectCache>();
reader.readFromParts(
//candidate.merge_tree_data_select_base_cache = std::make_unique<MergeTreeDataSelectCache>();
//candidate.merge_tree_data_select_projection_cache = std::make_unique<MergeTreeDataSelectCache>();
auto sum_marks = reader.estimateNumMarksToRead(
projection_parts,
candidate.required_columns,
metadata_snapshot,
candidate.desc->metadata,
query_info, // TODO syntax_analysis_result set in index
query_context,
0, // max_block_size is unused when getting cache
settings.max_threads,
max_added_blocks,
candidate.merge_tree_data_select_projection_cache.get());
max_added_blocks);
size_t sum_marks = candidate.merge_tree_data_select_projection_cache->sum_marks;
if (normal_parts.empty())
{
// All parts are projection parts which allows us to use in_order_optimization.
@ -3862,18 +3859,15 @@ static void selectBestProjection(
}
else
{
reader.readFromParts(
sum_marks += reader.estimateNumMarksToRead(
normal_parts,
required_columns,
metadata_snapshot,
metadata_snapshot,
query_info, // TODO syntax_analysis_result set in index
query_context,
0, // max_block_size is unused when getting cache
settings.max_threads,
max_added_blocks,
candidate.merge_tree_data_select_base_cache.get());
sum_marks += candidate.merge_tree_data_select_base_cache->sum_marks;
max_added_blocks);
}
// We choose the projection with least sum_marks to read.
@ -4101,7 +4095,7 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
if (!candidates.empty())
{
// First build a MergeTreeDataSelectCache to check if a projection is indeed better than base
query_info.merge_tree_data_select_cache = std::make_unique<MergeTreeDataSelectCache>();
// query_info.merge_tree_data_select_cache = std::make_unique<MergeTreeDataSelectCache>();
std::unique_ptr<PartitionIdToMaxBlock> max_added_blocks;
if (settings.select_sequential_consistency)
@ -4112,21 +4106,10 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
auto parts = getDataPartsVector();
MergeTreeDataSelectExecutor reader(*this);
reader.readFromParts(
parts,
analysis_result.required_columns,
metadata_snapshot,
metadata_snapshot,
query_info, // TODO syntax_analysis_result set in index
query_context,
0, // max_block_size is unused when getting cache
settings.max_threads,
max_added_blocks.get(),
query_info.merge_tree_data_select_cache.get());
// Add 1 to base sum_marks so that we prefer projections even when they have equal number of marks to read.
size_t min_sum_marks = query_info.merge_tree_data_select_cache->sum_marks + 1;
ProjectionCandidate * selected_candidate = nullptr;
size_t min_sum_marks = std::numeric_limits<size_t>::max();
bool has_ordinary_projection = false;
/// Favor aggregate projections
for (auto & candidate : candidates)
{
@ -4145,11 +4128,27 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
selected_candidate,
min_sum_marks);
}
else
has_ordinary_projection = true;
}
/// Select the best normal projection if no aggregate projection is available
if (!selected_candidate)
if (!selected_candidate && has_ordinary_projection)
{
min_sum_marks = reader.estimateNumMarksToRead(
parts,
analysis_result.required_columns,
metadata_snapshot,
metadata_snapshot,
query_info, // TODO syntax_analysis_result set in index
query_context,
settings.max_threads,
max_added_blocks.get());
// Add 1 to base sum_marks so that we prefer projections even when they have equal number of marks to read.
// NOTE: It is not clear if we need it. E.g. projections do not support skip index for now.
min_sum_marks += 1;
for (auto & candidate : candidates)
{
if (candidate.desc->type == ProjectionDescription::Type::Normal)

File diff suppressed because it is too large Load Diff

View File

@ -17,19 +17,22 @@ struct MergeTreeDataSelectSamplingData
{
bool use_sampling = false;
bool read_nothing = false;
Float64 used_sample_factor = 1.0;
std::shared_ptr<ASTFunction> filter_function;
ActionsDAGPtr filter_expression;
};
struct MergeTreeDataSelectCache
{
RangesInDataParts parts_with_ranges;
MergeTreeDataSelectSamplingData sampling;
std::unique_ptr<ReadFromMergeTree::IndexStats> index_stats;
size_t sum_marks = 0;
size_t sum_ranges = 0;
bool use_cache = false;
};
// struct MergeTreeDataSelectCache
// {
// RangesInDataParts parts_with_ranges;
// MergeTreeDataSelectSamplingData sampling;
// std::unique_ptr<ReadFromMergeTree::IndexStats> index_stats;
// size_t sum_marks = 0;
// size_t sum_ranges = 0;
// bool use_cache = false;
// };
using PartitionIdToMaxBlock = std::unordered_map<String, Int64>;
/** Executes SELECT queries on data from the merge tree.
*/
@ -41,7 +44,6 @@ public:
/** When reading, selects a set of parts that covers the desired range of the index.
* max_blocks_number_to_read - if not nullptr, do not read all the parts whose right border is greater than max_block in partition.
*/
using PartitionIdToMaxBlock = std::unordered_map<String, Int64>;
QueryPlanPtr read(
const Names & column_names,
@ -53,6 +55,16 @@ public:
QueryProcessingStage::Enum processed_stage,
const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr) const;
size_t estimateNumMarksToRead(
MergeTreeData::DataPartsVector parts,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot_base,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
ContextPtr context,
unsigned num_streams,
const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr) const;
QueryPlanPtr readFromParts(
MergeTreeData::DataPartsVector parts,
const Names & column_names,
@ -62,57 +74,56 @@ public:
ContextPtr context,
UInt64 max_block_size,
unsigned num_streams,
const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr,
MergeTreeDataSelectCache * cache = nullptr) const;
const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr) const;
private:
const MergeTreeData & data;
Poco::Logger * log;
QueryPlanPtr spreadMarkRangesAmongStreams(
RangesInDataParts && parts,
size_t num_streams,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
UInt64 max_block_size,
bool use_uncompressed_cache,
const SelectQueryInfo & query_info,
const Names & virt_columns,
const Settings & settings,
const MergeTreeReaderSettings & reader_settings,
const String & query_id) const;
// QueryPlanPtr spreadMarkRangesAmongStreams(
// RangesInDataParts && parts,
// size_t num_streams,
// const Names & column_names,
// const StorageMetadataPtr & metadata_snapshot,
// UInt64 max_block_size,
// bool use_uncompressed_cache,
// const SelectQueryInfo & query_info,
// const Names & virt_columns,
// const Settings & settings,
// const MergeTreeReaderSettings & reader_settings,
// const String & query_id) const;
/// out_projection - save projection only with columns, requested to read
QueryPlanPtr spreadMarkRangesAmongStreamsWithOrder(
RangesInDataParts && parts,
size_t num_streams,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
UInt64 max_block_size,
bool use_uncompressed_cache,
const SelectQueryInfo & query_info,
const ActionsDAGPtr & sorting_key_prefix_expr,
const Names & virt_columns,
const Settings & settings,
const MergeTreeReaderSettings & reader_settings,
ActionsDAGPtr & out_projection,
const String & query_id,
const InputOrderInfoPtr & input_order_info) const;
// /// out_projection - save projection only with columns, requested to read
// QueryPlanPtr spreadMarkRangesAmongStreamsWithOrder(
// RangesInDataParts && parts,
// size_t num_streams,
// const Names & column_names,
// const StorageMetadataPtr & metadata_snapshot,
// UInt64 max_block_size,
// bool use_uncompressed_cache,
// const SelectQueryInfo & query_info,
// const ActionsDAGPtr & sorting_key_prefix_expr,
// const Names & virt_columns,
// const Settings & settings,
// const MergeTreeReaderSettings & reader_settings,
// ActionsDAGPtr & out_projection,
// const String & query_id,
// const InputOrderInfoPtr & input_order_info) const;
QueryPlanPtr spreadMarkRangesAmongStreamsFinal(
RangesInDataParts && parts,
size_t num_streams,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
UInt64 max_block_size,
bool use_uncompressed_cache,
const SelectQueryInfo & query_info,
const Names & virt_columns,
const Settings & settings,
const MergeTreeReaderSettings & reader_settings,
ActionsDAGPtr & out_projection,
const String & query_id) const;
// QueryPlanPtr spreadMarkRangesAmongStreamsFinal(
// RangesInDataParts && parts,
// size_t num_streams,
// const Names & column_names,
// const StorageMetadataPtr & metadata_snapshot,
// UInt64 max_block_size,
// bool use_uncompressed_cache,
// const SelectQueryInfo & query_info,
// const Names & virt_columns,
// const Settings & settings,
// const MergeTreeReaderSettings & reader_settings,
// ActionsDAGPtr & out_projection,
// const String & query_id) const;
/// Get the approximate value (bottom estimate - only by full marks) of the number of rows falling under the index.
static size_t getApproximateTotalRowsToRead(
@ -140,7 +151,6 @@ private:
size_t & granules_dropped,
Poco::Logger * log);
public:
struct PartFilterCounters
{
size_t num_initial_selected_parts = 0;
@ -175,16 +185,36 @@ public:
PartFilterCounters & counters,
Poco::Logger * log);
static RangesInDataParts filterParts(
public:
static std::optional<std::unordered_set<String>> filterPartsByVirtualColumns(
const MergeTreeData & data,
MergeTreeData::DataPartsVector & parts,
const ASTPtr & query,
ContextPtr context);
static void filterPartsByPartition(
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeData & data,
const SelectQueryInfo & query_info,
ContextPtr & context,
ContextPtr & query_context,
MergeTreeData::DataPartsVector & parts,
const std::optional<std::unordered_set<String>> & part_values,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
Poco::Logger * log,
ReadFromMergeTree::IndexStats & index_stats);
static RangesInDataParts filterPartsByPrimaryKeyAndSkipIndexes(
MergeTreeData::DataPartsVector && parts,
StorageMetadataPtr metadata_snapshot,
SelectQueryInfo & query_info,
const SelectQueryInfo & query_info,
ContextPtr & context,
KeyCondition & key_condition,
const MergeTreeReaderSettings & reader_settings,
Poco::Logger * log,
size_t num_streams,
ReadFromMergeTree::IndexStats & index_stats);
ReadFromMergeTree::IndexStats & index_stats,
bool use_skip_indexes);
static MergeTreeDataSelectSamplingData getSampling(
const ASTSelectQuery & select,
@ -197,7 +227,7 @@ public:
NamesAndTypesList available_real_columns,
ContextPtr context);
static String checkLimits(MergeTreeData & data, const RangesInDataParts & parts_with_ranges, ContextPtr & context);
static String checkLimits(const MergeTreeData & data, const RangesInDataParts & parts_with_ranges, ContextPtr & context);
};
}

View File

@ -41,10 +41,7 @@ public:
query_info,
context,
max_block_size,
num_streams,
nullptr,
query_info.projection ? query_info.projection->merge_tree_data_select_base_cache.get()
: query_info.merge_tree_data_select_cache.get()));
num_streams));
return query_plan.convertToPipe(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));

View File

@ -137,8 +137,8 @@ struct ProjectionCandidate
ReadInOrderOptimizerPtr order_optimizer;
InputOrderInfoPtr input_order_info;
ManyExpressionActions group_by_elements_actions;
std::shared_ptr<MergeTreeDataSelectCache> merge_tree_data_select_base_cache;
std::shared_ptr<MergeTreeDataSelectCache> merge_tree_data_select_projection_cache;
// std::shared_ptr<MergeTreeDataSelectCache> merge_tree_data_select_base_cache;
// std::shared_ptr<MergeTreeDataSelectCache> merge_tree_data_select_projection_cache;
};
/** Query along with some additional data,