mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Merge pull request #66443 from amosbird/cleanup-projections
Clean up projection inside storage snapshot
This commit is contained in:
commit
66f95cd073
@ -46,7 +46,7 @@ public:
|
||||
return;
|
||||
|
||||
const auto & storage_snapshot = table_node ? table_node->getStorageSnapshot() : table_function_node->getStorageSnapshot();
|
||||
if (!storage->isVirtualColumn(column.name, storage_snapshot->getMetadataForQuery()))
|
||||
if (!storage->isVirtualColumn(column.name, storage_snapshot->metadata))
|
||||
return;
|
||||
|
||||
auto function_node = std::make_shared<FunctionNode>("shardNum");
|
||||
|
@ -1158,7 +1158,8 @@ bool TreeRewriterResult::collectUsedColumns(const ASTPtr & query, bool is_select
|
||||
}
|
||||
}
|
||||
|
||||
has_virtual_shard_num = is_remote_storage && storage->isVirtualColumn("_shard_num", storage_snapshot->getMetadataForQuery()) && virtuals->has("_shard_num");
|
||||
has_virtual_shard_num
|
||||
= is_remote_storage && storage->isVirtualColumn("_shard_num", storage_snapshot->metadata) && virtuals->has("_shard_num");
|
||||
}
|
||||
|
||||
/// Collect missed object subcolumns
|
||||
|
@ -417,20 +417,20 @@ void updatePrewhereOutputsIfNeeded(SelectQueryInfo & table_expression_query_info
|
||||
/// We evaluate sampling for Merge lazily so we need to get all the columns
|
||||
if (storage_snapshot->storage.getName() == "Merge")
|
||||
{
|
||||
const auto columns = storage_snapshot->getMetadataForQuery()->getColumns().getAll();
|
||||
const auto columns = storage_snapshot->metadata->getColumns().getAll();
|
||||
for (const auto & column : columns)
|
||||
required_columns.insert(column.name);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto columns_required_for_sampling = storage_snapshot->getMetadataForQuery()->getColumnsRequiredForSampling();
|
||||
auto columns_required_for_sampling = storage_snapshot->metadata->getColumnsRequiredForSampling();
|
||||
required_columns.insert(columns_required_for_sampling.begin(), columns_required_for_sampling.end());
|
||||
}
|
||||
}
|
||||
|
||||
if (table_expression_modifiers->hasFinal())
|
||||
{
|
||||
auto columns_required_for_final = storage_snapshot->getMetadataForQuery()->getColumnsRequiredForFinal();
|
||||
auto columns_required_for_final = storage_snapshot->metadata->getColumnsRequiredForFinal();
|
||||
required_columns.insert(columns_required_for_final.begin(), columns_required_for_final.end());
|
||||
}
|
||||
}
|
||||
|
@ -757,9 +757,7 @@ std::optional<String> optimizeUseAggregateProjections(QueryPlan::Node & node, Qu
|
||||
else
|
||||
{
|
||||
auto storage_snapshot = reading->getStorageSnapshot();
|
||||
auto proj_snapshot = std::make_shared<StorageSnapshot>(storage_snapshot->storage, storage_snapshot->metadata);
|
||||
proj_snapshot->addProjection(best_candidate->projection);
|
||||
|
||||
auto proj_snapshot = std::make_shared<StorageSnapshot>(storage_snapshot->storage, best_candidate->projection->metadata);
|
||||
auto projection_query_info = query_info;
|
||||
projection_query_info.prewhere_info = nullptr;
|
||||
projection_query_info.filter_actions_dag = nullptr;
|
||||
|
@ -193,9 +193,7 @@ std::optional<String> optimizeUseNormalProjections(Stack & stack, QueryPlan::Nod
|
||||
}
|
||||
|
||||
auto storage_snapshot = reading->getStorageSnapshot();
|
||||
auto proj_snapshot = std::make_shared<StorageSnapshot>(storage_snapshot->storage, storage_snapshot->metadata);
|
||||
proj_snapshot->addProjection(best_candidate->projection);
|
||||
|
||||
auto proj_snapshot = std::make_shared<StorageSnapshot>(storage_snapshot->storage, best_candidate->projection->metadata);
|
||||
auto query_info_copy = query_info;
|
||||
query_info_copy.prewhere_info = nullptr;
|
||||
|
||||
|
@ -285,7 +285,6 @@ ReadFromMergeTree::ReadFromMergeTree(
|
||||
, all_column_names(std::move(all_column_names_))
|
||||
, data(data_)
|
||||
, actions_settings(ExpressionActionsSettings::fromContext(context_))
|
||||
, metadata_for_reading(storage_snapshot->getMetadataForQuery())
|
||||
, block_size{
|
||||
.max_block_size_rows = max_block_size_,
|
||||
.preferred_block_size_bytes = context->getSettingsRef().preferred_block_size_bytes,
|
||||
@ -327,7 +326,7 @@ ReadFromMergeTree::ReadFromMergeTree(
|
||||
|
||||
updateSortDescriptionForOutputStream(
|
||||
*output_stream,
|
||||
storage_snapshot->getMetadataForQuery()->getSortingKeyColumns(),
|
||||
storage_snapshot->metadata->getSortingKeyColumns(),
|
||||
getSortDirection(),
|
||||
query_info.input_order_info,
|
||||
prewhere_info,
|
||||
@ -782,7 +781,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(RangesInDataParts && parts_
|
||||
Names in_order_column_names_to_read(column_names);
|
||||
|
||||
/// Add columns needed to calculate the sorting expression
|
||||
for (const auto & column_name : metadata_for_reading->getColumnsRequiredForSortingKey())
|
||||
for (const auto & column_name : storage_snapshot->metadata->getColumnsRequiredForSortingKey())
|
||||
{
|
||||
if (column_names_set.contains(column_name))
|
||||
continue;
|
||||
@ -802,10 +801,10 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(RangesInDataParts && parts_
|
||||
info.use_uncompressed_cache);
|
||||
};
|
||||
|
||||
auto sorting_expr = metadata_for_reading->getSortingKey().expression;
|
||||
auto sorting_expr = storage_snapshot->metadata->getSortingKey().expression;
|
||||
|
||||
SplitPartsWithRangesByPrimaryKeyResult split_ranges_result = splitPartsWithRangesByPrimaryKey(
|
||||
metadata_for_reading->getPrimaryKey(),
|
||||
storage_snapshot->metadata->getPrimaryKey(),
|
||||
std::move(sorting_expr),
|
||||
std::move(parts_with_ranges),
|
||||
num_streams,
|
||||
@ -883,7 +882,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
|
||||
if (prewhere_info)
|
||||
{
|
||||
NameSet sorting_columns;
|
||||
for (const auto & column : metadata_for_reading->getSortingKey().expression->getRequiredColumnsWithTypes())
|
||||
for (const auto & column : storage_snapshot->metadata->getSortingKey().expression->getRequiredColumnsWithTypes())
|
||||
sorting_columns.insert(column.name);
|
||||
|
||||
have_input_columns_removed_after_prewhere = restorePrewhereInputs(*prewhere_info, sorting_columns);
|
||||
@ -1038,12 +1037,12 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
|
||||
if (need_preliminary_merge || output_each_partition_through_separate_port)
|
||||
{
|
||||
size_t prefix_size = input_order_info->used_prefix_of_sorting_key_size;
|
||||
auto order_key_prefix_ast = metadata_for_reading->getSortingKey().expression_list_ast->clone();
|
||||
auto order_key_prefix_ast = storage_snapshot->metadata->getSortingKey().expression_list_ast->clone();
|
||||
order_key_prefix_ast->children.resize(prefix_size);
|
||||
|
||||
auto syntax_result = TreeRewriter(context).analyze(order_key_prefix_ast, metadata_for_reading->getColumns().getAllPhysical());
|
||||
auto syntax_result = TreeRewriter(context).analyze(order_key_prefix_ast, storage_snapshot->metadata->getColumns().getAllPhysical());
|
||||
auto sorting_key_prefix_expr = ExpressionAnalyzer(order_key_prefix_ast, syntax_result, context).getActionsDAG(false);
|
||||
const auto & sorting_columns = metadata_for_reading->getSortingKey().column_names;
|
||||
const auto & sorting_columns = storage_snapshot->metadata->getSortingKey().column_names;
|
||||
|
||||
SortDescription sort_description;
|
||||
sort_description.compile_sort_description = settings.compile_sort_description;
|
||||
@ -1150,7 +1149,7 @@ bool ReadFromMergeTree::doNotMergePartsAcrossPartitionsFinal() const
|
||||
if (settings.do_not_merge_across_partitions_select_final.changed)
|
||||
return settings.do_not_merge_across_partitions_select_final;
|
||||
|
||||
if (!metadata_for_reading->hasPrimaryKey() || !metadata_for_reading->hasPartitionKey())
|
||||
if (!storage_snapshot->metadata->hasPrimaryKey() || !storage_snapshot->metadata->hasPartitionKey())
|
||||
return false;
|
||||
|
||||
/** To avoid merging parts across partitions we want result of partition key expression for
|
||||
@ -1160,11 +1159,11 @@ bool ReadFromMergeTree::doNotMergePartsAcrossPartitionsFinal() const
|
||||
* in primary key, then for same primary key column values, result of partition key expression
|
||||
* will be the same.
|
||||
*/
|
||||
const auto & partition_key_expression = metadata_for_reading->getPartitionKey().expression;
|
||||
const auto & partition_key_expression = storage_snapshot->metadata->getPartitionKey().expression;
|
||||
if (partition_key_expression->getActionsDAG().hasNonDeterministic())
|
||||
return false;
|
||||
|
||||
const auto & primary_key_columns = metadata_for_reading->getPrimaryKey().column_names;
|
||||
const auto & primary_key_columns = storage_snapshot->metadata->getPrimaryKey().column_names;
|
||||
NameSet primary_key_columns_set(primary_key_columns.begin(), primary_key_columns.end());
|
||||
|
||||
const auto & partition_key_required_columns = partition_key_expression->getRequiredColumns();
|
||||
@ -1217,12 +1216,12 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
|
||||
/// we will store lonely parts with level > 0 to use parallel select on them.
|
||||
RangesInDataParts non_intersecting_parts_by_primary_key;
|
||||
|
||||
auto sorting_expr = metadata_for_reading->getSortingKey().expression;
|
||||
auto sorting_expr = storage_snapshot->metadata->getSortingKey().expression;
|
||||
|
||||
if (prewhere_info)
|
||||
{
|
||||
NameSet sorting_columns;
|
||||
for (const auto & column : metadata_for_reading->getSortingKey().expression->getRequiredColumnsWithTypes())
|
||||
for (const auto & column : storage_snapshot->metadata->getSortingKey().expression->getRequiredColumnsWithTypes())
|
||||
sorting_columns.insert(column.name);
|
||||
restorePrewhereInputs(*prewhere_info, sorting_columns);
|
||||
}
|
||||
@ -1253,7 +1252,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
|
||||
if (new_parts.empty())
|
||||
continue;
|
||||
|
||||
if (num_streams > 1 && metadata_for_reading->hasPrimaryKey())
|
||||
if (num_streams > 1 && storage_snapshot->metadata->hasPrimaryKey())
|
||||
{
|
||||
// Let's split parts into non intersecting parts ranges and layers to ensure data parallelism of FINAL.
|
||||
auto in_order_reading_step_getter = [this, &column_names, &info](auto parts)
|
||||
@ -1273,7 +1272,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
|
||||
data.merging_params.is_deleted_column.empty() && !reader_settings.read_in_order;
|
||||
|
||||
SplitPartsWithRangesByPrimaryKeyResult split_ranges_result = splitPartsWithRangesByPrimaryKey(
|
||||
metadata_for_reading->getPrimaryKey(),
|
||||
storage_snapshot->metadata->getPrimaryKey(),
|
||||
sorting_expr,
|
||||
std::move(new_parts),
|
||||
num_streams,
|
||||
@ -1305,7 +1304,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
|
||||
if (pipes.empty())
|
||||
continue;
|
||||
|
||||
Names sort_columns = metadata_for_reading->getSortingKeyColumns();
|
||||
Names sort_columns = storage_snapshot->metadata->getSortingKeyColumns();
|
||||
SortDescription sort_description;
|
||||
sort_description.compile_sort_description = settings.compile_sort_description;
|
||||
sort_description.min_count_to_compile_sort_description = settings.min_count_to_compile_sort_description;
|
||||
@ -1313,7 +1312,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
|
||||
size_t sort_columns_size = sort_columns.size();
|
||||
sort_description.reserve(sort_columns_size);
|
||||
|
||||
Names partition_key_columns = metadata_for_reading->getPartitionKey().column_names;
|
||||
Names partition_key_columns = storage_snapshot->metadata->getPartitionKey().column_names;
|
||||
|
||||
for (size_t i = 0; i < sort_columns_size; ++i)
|
||||
sort_description.emplace_back(sort_columns[i], 1, 1);
|
||||
@ -1370,7 +1369,7 @@ ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
|
||||
return selectRangesToRead(
|
||||
std::move(parts),
|
||||
std::move(alter_conversions),
|
||||
metadata_for_reading,
|
||||
storage_snapshot->metadata,
|
||||
query_info,
|
||||
context,
|
||||
requested_num_streams,
|
||||
@ -1534,7 +1533,7 @@ void ReadFromMergeTree::applyFilters(ActionDAGNodes added_filter_nodes)
|
||||
prepared_parts,
|
||||
context,
|
||||
query_info,
|
||||
metadata_for_reading);
|
||||
storage_snapshot->metadata);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1703,7 +1702,7 @@ bool ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction,
|
||||
|
||||
/// update sort info for output stream
|
||||
SortDescription sort_description;
|
||||
const Names & sorting_key_columns = metadata_for_reading->getSortingKeyColumns();
|
||||
const Names & sorting_key_columns = storage_snapshot->metadata->getSortingKeyColumns();
|
||||
const Block & header = output_stream->header;
|
||||
const int sort_direction = getSortDirection();
|
||||
for (const auto & column_name : sorting_key_columns)
|
||||
@ -1745,7 +1744,7 @@ void ReadFromMergeTree::updatePrewhereInfo(const PrewhereInfoPtr & prewhere_info
|
||||
|
||||
updateSortDescriptionForOutputStream(
|
||||
*output_stream,
|
||||
storage_snapshot->getMetadataForQuery()->getSortingKeyColumns(),
|
||||
storage_snapshot->metadata->getSortingKeyColumns(),
|
||||
getSortDirection(),
|
||||
query_info.input_order_info,
|
||||
prewhere_info,
|
||||
@ -1871,7 +1870,7 @@ Pipe ReadFromMergeTree::spreadMarkRanges(
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Optimization isn't supposed to be used for queries with final");
|
||||
|
||||
/// Add columns needed to calculate the sorting expression and the sign.
|
||||
for (const auto & column : metadata_for_reading->getColumnsRequiredForSortingKey())
|
||||
for (const auto & column : storage_snapshot->metadata->getColumnsRequiredForSortingKey())
|
||||
{
|
||||
if (!names.contains(column))
|
||||
{
|
||||
@ -1965,10 +1964,6 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
|
||||
fmt::format("{}.{}", data.getStorageID().getFullNameNotQuoted(), part.data_part->info.partition_id));
|
||||
}
|
||||
context->getQueryContext()->addQueryAccessInfo(partition_names);
|
||||
|
||||
if (storage_snapshot->projection)
|
||||
context->getQueryContext()->addQueryAccessInfo(
|
||||
Context::QualifiedProjectionName{.storage_id = data.getStorageID(), .projection_name = storage_snapshot->projection->name});
|
||||
}
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::SelectedParts, result.selected_parts);
|
||||
|
@ -171,7 +171,7 @@ public:
|
||||
|
||||
AnalysisResultPtr selectRangesToRead(bool find_exact_ranges = false) const;
|
||||
|
||||
StorageMetadataPtr getStorageMetadata() const { return metadata_for_reading; }
|
||||
StorageMetadataPtr getStorageMetadata() const { return storage_snapshot->metadata; }
|
||||
|
||||
/// Returns `false` if requested reading cannot be performed.
|
||||
bool requestReadingInOrder(size_t prefix_size, int direction, size_t limit);
|
||||
@ -216,8 +216,6 @@ private:
|
||||
const MergeTreeData & data;
|
||||
ExpressionActionsSettings actions_settings;
|
||||
|
||||
StorageMetadataPtr metadata_for_reading;
|
||||
|
||||
const MergeTreeReadTask::BlockSizeParams block_size;
|
||||
|
||||
size_t requested_num_streams;
|
||||
|
@ -75,7 +75,7 @@ void IMergeTreeReader::fillVirtualColumns(Columns & columns, size_t rows) const
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Filling of virtual columns is supported only for LoadedMergeTreeDataPartInfoForReader");
|
||||
|
||||
const auto & data_part = loaded_part_info->getDataPart();
|
||||
const auto & storage_columns = storage_snapshot->getMetadataForQuery()->getColumns();
|
||||
const auto & storage_columns = storage_snapshot->metadata->getColumns();
|
||||
const auto & virtual_columns = storage_snapshot->virtual_columns;
|
||||
|
||||
auto it = requested_columns.begin();
|
||||
|
@ -71,8 +71,7 @@ bool injectRequiredColumnsRecursively(
|
||||
|
||||
/// Column doesn't have default value and don't exist in part
|
||||
/// don't need to add to required set.
|
||||
auto metadata_snapshot = storage_snapshot->getMetadataForQuery();
|
||||
const auto column_default = metadata_snapshot->getColumns().getDefault(column_name);
|
||||
const auto column_default = storage_snapshot->metadata->getColumns().getDefault(column_name);
|
||||
if (!column_default)
|
||||
return false;
|
||||
|
||||
|
@ -7141,12 +7141,7 @@ UInt64 MergeTreeData::estimateNumberOfRowsToRead(
|
||||
|
||||
MergeTreeDataSelectExecutor reader(*this);
|
||||
auto result_ptr = reader.estimateNumMarksToRead(
|
||||
parts,
|
||||
storage_snapshot->getMetadataForQuery()->getColumns().getAll().getNames(),
|
||||
storage_snapshot->metadata,
|
||||
query_info,
|
||||
query_context,
|
||||
query_context->getSettingsRef().max_threads);
|
||||
parts, {}, storage_snapshot->metadata, query_info, query_context, query_context->getSettingsRef().max_threads);
|
||||
|
||||
UInt64 total_rows = result_ptr->selected_rows;
|
||||
if (query_info.trivial_limit > 0 && query_info.trivial_limit < total_rows)
|
||||
|
@ -602,7 +602,7 @@ std::vector<ReadFromMerge::ChildPlan> ReadFromMerge::createChildrenPlans(SelectQ
|
||||
ASTPtr required_columns_expr_list = std::make_shared<ASTExpressionList>();
|
||||
ASTPtr column_expr;
|
||||
|
||||
auto sample_block = merge_storage_snapshot->getMetadataForQuery()->getSampleBlock();
|
||||
auto sample_block = merge_storage_snapshot->metadata->getSampleBlock();
|
||||
|
||||
for (const auto & column : real_column_names)
|
||||
{
|
||||
|
@ -63,7 +63,6 @@ std::shared_ptr<StorageSnapshot> StorageSnapshot::clone(DataPtr data_) const
|
||||
{
|
||||
auto res = std::make_shared<StorageSnapshot>(storage, metadata, object_columns);
|
||||
|
||||
res->projection = projection;
|
||||
res->data = std::move(data_);
|
||||
|
||||
return res;
|
||||
@ -79,7 +78,7 @@ ColumnsDescription StorageSnapshot::getAllColumnsDescription() const
|
||||
|
||||
NamesAndTypesList StorageSnapshot::getColumns(const GetColumnsOptions & options) const
|
||||
{
|
||||
auto all_columns = getMetadataForQuery()->getColumns().get(options);
|
||||
auto all_columns = metadata->getColumns().get(options);
|
||||
|
||||
if (options.with_extended_objects)
|
||||
extendObjectColumns(all_columns, object_columns, options.with_subcolumns);
|
||||
@ -113,7 +112,7 @@ NamesAndTypesList StorageSnapshot::getColumnsByNames(const GetColumnsOptions & o
|
||||
|
||||
std::optional<NameAndTypePair> StorageSnapshot::tryGetColumn(const GetColumnsOptions & options, const String & column_name) const
|
||||
{
|
||||
const auto & columns = getMetadataForQuery()->getColumns();
|
||||
const auto & columns = metadata->getColumns();
|
||||
auto column = columns.tryGetColumn(options, column_name);
|
||||
if (column && (!column->type->hasDynamicSubcolumnsDeprecated() || !options.with_extended_objects))
|
||||
return column;
|
||||
@ -189,7 +188,7 @@ Block StorageSnapshot::getSampleBlockForColumns(const Names & column_names) cons
|
||||
{
|
||||
Block res;
|
||||
|
||||
const auto & columns = getMetadataForQuery()->getColumns();
|
||||
const auto & columns = metadata->getColumns();
|
||||
for (const auto & column_name : column_names)
|
||||
{
|
||||
auto column = columns.tryGetColumnOrSubcolumn(GetColumnsOptions::All, column_name);
|
||||
@ -221,7 +220,7 @@ Block StorageSnapshot::getSampleBlockForColumns(const Names & column_names) cons
|
||||
ColumnsDescription StorageSnapshot::getDescriptionForColumns(const Names & column_names) const
|
||||
{
|
||||
ColumnsDescription res;
|
||||
const auto & columns = getMetadataForQuery()->getColumns();
|
||||
const auto & columns = metadata->getColumns();
|
||||
for (const auto & name : column_names)
|
||||
{
|
||||
auto column = columns.tryGetColumnOrSubcolumnDescription(GetColumnsOptions::All, name);
|
||||
@ -257,7 +256,7 @@ namespace
|
||||
|
||||
void StorageSnapshot::check(const Names & column_names) const
|
||||
{
|
||||
const auto & columns = getMetadataForQuery()->getColumns();
|
||||
const auto & columns = metadata->getColumns();
|
||||
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withSubcolumns();
|
||||
|
||||
if (column_names.empty())
|
||||
|
@ -30,9 +30,6 @@ struct StorageSnapshot
|
||||
using DataPtr = std::unique_ptr<Data>;
|
||||
DataPtr data;
|
||||
|
||||
/// Projection that is used in query.
|
||||
mutable const ProjectionDescription * projection = nullptr;
|
||||
|
||||
StorageSnapshot(
|
||||
const IStorage & storage_,
|
||||
StorageMetadataPtr metadata_);
|
||||
@ -82,11 +79,6 @@ struct StorageSnapshot
|
||||
void check(const Names & column_names) const;
|
||||
|
||||
DataTypePtr getConcreteType(const String & column_name) const;
|
||||
|
||||
void addProjection(const ProjectionDescription * projection_) const { projection = projection_; }
|
||||
|
||||
/// If we have a projection then we should use its metadata.
|
||||
StorageMetadataPtr getMetadataForQuery() const { return projection ? projection->metadata : metadata; }
|
||||
};
|
||||
|
||||
using StorageSnapshotPtr = std::shared_ptr<StorageSnapshot>;
|
||||
|
Loading…
Reference in New Issue
Block a user